Skip to content

Commit

Permalink
Add proper logger to consumer and handlers
Browse files Browse the repository at this point in the history
Create (hidden) package PGXN::API::Consumer::Log and set it up with log
level as the first argument to its `log` method, and suppress logging
according to verbosity. Teach Consumer to use it, then also teach the
Mastodon and Twitter handlers. Add tests to ensure it all works,
including time mocking in the Mastodon and Twitter tests.

Note that the handlers no longer throw an error on failure, but simply
log it. The Consumer still wraps the handlers in `try`/`catch` in order
to log any unexpected errors and continue runnnig.

This will mean additional log information from the handlers, in
particular, which did no logging previously. Will be helpful when
something happens to the consumer and we can see in the log what the
last release to be processed was.

While at it, remove unnecessary uses of Carp.
  • Loading branch information
theory committed Feb 16, 2024
1 parent 70fc891 commit 7dc7eb3
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 117 deletions.
3 changes: 3 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Revision history for Perl extension PGXN::Manager
output.
- Configured handlers for INT, TERM, and QUIT signals to log flagging
for shutdown in the next loop.
- Fixed invalid license example in the META spec.
- Added a logger to the Consumer and the Mastodon and Twitter handlers,
so that they now log debug and info messages about what's being sent.

0.31.1 2023-10-07T21:40:53Z
- Restored the writing of the pgxn_consumer PID file, accidentally
Expand Down
134 changes: 79 additions & 55 deletions lib/PGXN/Manager/Consumer.pm
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,34 @@ use Moose;
use JSON::XS;
use Try::Tiny;
use Encode qw(encode_utf8);
use Carp;
use Time::HiRes qw(sleep);
use PGXN::Manager;
use Proc::Daemon;
use IO::File;
use POSIX ();
use Fcntl qw(:flock);
use Cwd;
use namespace::autoclean;

our $VERSION = v0.31.2;
use constant CHANNELS => qw(release new_user new_mirror);

has verbose => (is => 'ro', isa => 'Int', required => 1, default => 0);
has interval => (is => 'ro', isa => 'Num', required => 1, default => 5);
has verbose => (is => 'ro', isa => 'Int', required => 1, default => 0);
has interval => (is => 'ro', isa => 'Num', required => 1, default => 5);
has continue => (is => 'rw', isa => 'Bool', required => 1, default => 1);
has pid_file => (is => 'rw', isa => 'Str');
has log_fh => (is => 'ro', isa => 'IO::Handle', required => 1, default => sub {
_log_fh()
});
has logger => (
is => 'ro',
isa => 'PGXN::Manager::Consumer::Log',
required => 1,
lazy => 1,
default => sub { PGXN::Manager::Consumer::Log->new(verbose => $_[0]->verbose) },
);

has conn => (is => 'ro', isa => 'DBIx::Connector', lazy => 1, default => sub {
# Use our own connetion instead of $pgxn->conn in order to add the callback.
my $self = shift;
my $cb = $self->verbose ? sub {
$_[0]->do("LISTEN pgxn_$_") for CHANNELS;
$self->log("INFO: Listening on ", join ', ', map { s/^pgxn_//r } @{
$self->log(INFO => 'Listening on ', join ', ', map { s/^pgxn_//r } @{
$_[0]->selectcol_arrayref('SELECT * FROM pg_listening_channels()')
});
return;
Expand All @@ -48,15 +50,6 @@ has conn => (is => 'ro', isa => 'DBIx::Connector', lazy => 1, default => sub
});
});

sub _log_fh {
my $fn = shift;
my $fh = $fn ? IO::File->new($fn, '>>:utf8')
: IO::Handle->new_from_fd(fileno STDOUT, 'w');
binmode $fh, ":utf8";
$fh->autoflush(1);
$fh;
}

sub go {
my $class = shift;
my $cfg = $class->_config;
Expand All @@ -69,16 +62,19 @@ sub go {
);
if (my $pid = $daemon->Init) {
my $pid_file = $cfg->{'pid-file'} || 'STDOUT';
_log(
_log_fh($cfg->{'log-file'}),
"INFO: Forked PID $pid written to $pid_file",
);
PGXN::Manager::Consumer::Log->new(
verbose => $cfg->{verbose},
file => $cfg->{'log-file'},
)->log(INFO => "Forked PID $pid written to $pid_file");
return 0;
}
}

# In the child process. Set up log file handle and go.
$cfg->{log_fh} = _log_fh delete $cfg->{'log-file'};
# In the child process. Set up logger and go.
$cfg->{logger} = PGXN::Manager::Consumer::Log->new(
verbose => $cfg->{verbose},
file => delete $cfg->{'log-file'},
);
$cfg->{pid_file} = delete $cfg->{'pid-file'} if exists $cfg->{'pid-file'};
my $cmd = $class->new( $cfg );
$SIG{$_} = $cmd->_signal_handler($_) for qw(TERM INT QUIT);
Expand All @@ -88,7 +84,7 @@ sub go {
sub _signal_handler {
my ($self, $sig) = @_;
return sub {
$self->log("INFO: $sig signal caught; flagging shutdown for next loop");
$self->log(INFO => "$sig signal caught; flagging shutdown for next loop");
$self->continue(0);
};
}
Expand All @@ -102,10 +98,10 @@ sub DEMOLISH {

sub run {
my $self = shift;
$self->log(sprintf "INFO: Starting %s %s", __PACKAGE__, __PACKAGE__->VERSION);
$self->log(INFO => sprintf "Starting %s %s", __PACKAGE__, __PACKAGE__->VERSION);
my $pgxn = PGXN::Manager->instance;
my $cfg = $pgxn->config->{consumers} || do {
$self->log("WARN: No consumers configured; messages will be dropped");
$self->log(WARN => 'No consumers configured; messages will be dropped');
undef
};

Expand All @@ -118,7 +114,7 @@ sub run {
sleep($self->interval);
}

$self->log("INFO: Shutting down");
$self->log(INFO => 'Shutting down');
return 0;
}

Expand All @@ -129,17 +125,17 @@ sub load_consumers {
my $type = delete $cfg->{type}
or die "No type specified for event consumer\n";
my $pkg = __PACKAGE__ . "::$type";
$self->log("INFO: Loading $pkg") if $self->verbose > 1;
$self->log(DEBUG => "Loading $pkg");
eval "use $pkg";
die "Error loading $pkg: $@\n" if $@;
my $events = delete $cfg->{events};
my $consumer = $pkg->new(
verbose => $self->verbose,
config => $cfg,
logger => $self->logger,
config => $cfg,
);

for my $e (@{ $events }) {
$self->log("INFO: Configuring $pkg for $e") if $self->verbose > 1;
$self->log(DEBUG => "Configuring $pkg for $e");
push @{ $consumers{$e} ||= [] } => $consumer;
}
}
Expand All @@ -154,57 +150,45 @@ sub consume {
# Notify payload treated as UTF-8 text, so already decoded from UTF-8 bytes.
my $json = JSON::XS->new->utf8(0);
my $dbh = shift;
$self->log("INFO: Consuming") if $self->verbose > 2;
$self->log(DEBUG => 'Consuming');
while (my $notify = $dbh->pg_notifies) {
my ($name, $pid, $msg) = @{ $notify };
$self->log("INFO: Received “$name” event from PID $pid")
if $self->verbose;
$self->log(INFO => "Received “$name” event from PID $pid");
unless ($name =~ s/^pgxn_//) {
$self->log("WARN: Unknown channel “$name”; skipping");
$self->log(WARN => "Unknown channel “$name”; skipping");
next;
}
my $handlers = $consumers_for->{$name} || do {
$self->log(
"INFO: No handlers configured for ",
"pgxn_$name channel; skipping",
)if $self->verbose;
$self->log(INFO =>
"No handlers configured for pgxn_$name channel; skipping",
);
next;
};

# Decode the JSON payload;
my $meta = try {
$json->decode($msg);
} catch {
$self->log("ERORR: Cannot decode JSON: $_");
$self->log(ERORR => "Cannot decode JSON: $_");
undef;
};
next unless $meta;

# Run all the handlers.
for my $h (@{ $handlers }) {
$self->log("INFO: Sending to ", $h->name, " handler")
if $self->verbose;
$self->log(INFO => 'Sending to ', $h->name, " handler");
try { $h->handle($name, $meta) }
catch { $self->log("ERROR: $_") };
catch { $self->log(ERROR => $_) };
}
}
});
} catch {
$self->log("ERROR: $_");
$self->log(ERROR => $_);
};
return 1;
}

sub log {
_log(shift->log_fh, @_);
}

sub _log {
my $fh = shift;
flock $fh, LOCK_EX;
say {$fh} POSIX::strftime('%Y-%m-%dT%H:%M:%SZ - ', gmtime), join '', @_;
flock $fh, LOCK_UN;
}
sub log { shift->logger->log(@_) }

sub _config {
my $self = shift;
Expand Down Expand Up @@ -262,6 +246,46 @@ sub _pod2usage {
);
}

package
PGXN::Manager::Consumer::Log;

use 5.10.0;
use strict;
use warnings;
use utf8;
use Moose;
use Fcntl qw(:flock);

has verbose => (is => 'ro', isa => 'Int', required => 1, default => 0);
has log_fh => (is => 'ro', isa => 'FileHandle', required => 1, default => sub {
_log_fh()
});

around BUILDARGS => sub {
my ($orig, $class, %args) = @_;
my $fn = delete $args{file} or return $class->$orig(%args);
return $class->$orig(%args, log_fh => _log_fh($fn));
};

sub log {
my ($self, $level) = (shift, shift);
return if $level eq 'INFO' && $self->verbose < 1;
return if $level eq 'DEBUG' && $self->verbose < 2;
my $fh = $self->log_fh;
flock $fh, LOCK_EX;
say {$fh} POSIX::strftime("%Y-%m-%dT%H:%M:%SZ - $level: ", gmtime), join '', @_;
flock $fh, LOCK_UN;
}

sub _log_fh {
my $fn = shift;
my $fh = $fn ? IO::File->new($fn, '>>:utf8')
: IO::Handle->new_from_fd(fileno STDOUT, 'w');
binmode $fh, ":utf8";
$fh->autoflush(1);
$fh;
}

1;
__END__
Expand Down
18 changes: 14 additions & 4 deletions lib/PGXN/Manager/Consumer/mastodon.pm
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ our $VERSION = v0.31.2;
has server => (is => 'ro', required => 1, isa => 'Str');
has ua => (is => 'ro', required => 1, isa => 'LWP::UserAgent');
has delay => (is => 'ro', required => 0, isa => 'Int', default => 0);
has logger => (is => 'ro', required => 1, isa => 'PGXN::Manager::Consumer::Log');

around BUILDARGS => sub {
my ($orig, $class, %args) = @_;
Expand Down Expand Up @@ -50,7 +51,13 @@ my %EMOJI = (

sub handle {
my ($self, $type, $meta) = @_;
return unless $type eq 'release';
if ($type ne 'release') {
$self->logger->log(DEBUG => "Mastodon skiping $type notification");
return;
};

my $release = lc "$meta->{name}-$meta->{version}";
$self->logger->log(INFO => "Posting $release to Mastodon");

my $link = PGXN::Manager->instance->config->{release_permalink};
my $url = URI::Template->new($link)->process({
Expand All @@ -59,7 +66,7 @@ sub handle {
});

my %emo = map { $_ => $EMOJI{$_}[rand @{ $EMOJI{$_} }] } keys %EMOJI;
$self->toot(join("\n\n",
$self->toot($release, join("\n\n",
"$emo{send} Released: $meta->{name} $meta->{version}",
"$emo{info} $meta->{abstract}",
"$emo{user} By $meta->{user}",
Expand All @@ -68,7 +75,7 @@ sub handle {
}

sub toot {
my ($self, $body) = @_;
my ($self, $release, $body) = @_;

my $res = $self->ua->post(
$self->server . '/api/v1/statuses',
Expand All @@ -78,7 +85,10 @@ sub toot {
},
);
return 1 if $res->is_success;
die "Error posting to Mastodon: " . $res->decoded_content . "\n";

$self->logger->log(ERROR =>
"Error posting $release to Mastodon: " . $res->decoded_content
);
}

sub scheduled_at {
Expand Down
20 changes: 16 additions & 4 deletions lib/PGXN/Manager/Consumer/twitter.pm
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ our $VERSION = v0.31.2;

subtype MaybeTwitterAPI => as maybe_type class_type 'Net::Twitter::Lite';

has verbose => (is => 'ro', required => 1, isa => 'Int', default => 0);
has client => (is => 'ro', required => 1, isa => 'Net::Twitter::Lite');
has logger => (is => 'ro', required => 1, isa => 'PGXN::Manager::Consumer::Log');

around BUILDARGS => sub {
my ($orig, $class, %args) = @_;
Expand Down Expand Up @@ -45,11 +45,16 @@ around BUILDARGS => sub {
};

sub handle {
my ($self, $channel, $meta) = @_;
return unless $channel eq 'release';
my ($self, $type, $meta) = @_;
if ($type ne 'release') {
$self->logger->log(DEBUG => "Twitter skiping $type notification");
return;
};

my $client = $self->client or return;
my $pgxn = PGXN::Manager->instance;

$self->logger->log(DEBUG => "Fetching Twitter username for $meta->{user}");
my $nick = $pgxn->conn->run(sub {
shift->selectcol_arrayref(
'SELECT twitter FROM users WHERE nickname = ?',
Expand All @@ -59,11 +64,18 @@ sub handle {

$nick = $nick ? "\@$nick" : $meta->{user};

my $release = lc "$meta->{name}-$meta->{version}";
$self->logger->log(INFO => "Posting $release to Twitter");

my $url = URI::Template->new($pgxn->config->{release_permalink})->process({
dist => lc $meta->{name},
version => lc $meta->{version},
});
$client->update( "$meta->{name} $meta->{version} released by $nick: $url" );
try {
$client->update( "$meta->{name} $meta->{version} released by $nick: $url" );
} catch {
$self->logger->log(ERROR => "Error posting $release to Twitter: $_");
};
}

1;
Expand Down
3 changes: 1 addition & 2 deletions lib/PGXN/Manager/Maint.pm
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use File::Spec;
use File::Path qw(make_path remove_tree);
use File::Basename qw(dirname basename);
use Encode qw(encode_utf8);
use Carp;
use namespace::autoclean;

our $VERSION = v0.31.2;
Expand Down Expand Up @@ -50,7 +49,7 @@ sub run {
my ($self, $command) = (shift, shift);
$command =~ s/-/_/g;
my $meth = $self->can($command)
or croak qq{PGXN Maint: "$command" is not a command};
or die qq{PGXN Maint: "$command" is not a command};
require PGXN::Manager;
$self->$meth(@_);
}
Expand Down
Loading

0 comments on commit 7dc7eb3

Please sign in to comment.