Skip to content

Commit

Permalink
add aggregations to log2sql conversion #34
Browse files Browse the repository at this point in the history
  • Loading branch information
matyaskopp committed Nov 15, 2021
1 parent 4e86a8e commit b3891b4
Showing 1 changed file with 114 additions and 6 deletions.
120 changes: 114 additions & 6 deletions scripts/log2sql.pl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use File::Spec;
use File::Basename;
use Digest::MD5;
use DateTime::Format::Strptime;



Expand All @@ -20,6 +21,11 @@
my @services;
my @print_sql;

my $strp = DateTime::Format::Strptime->new(
pattern => '%d/%b/%Y:%H:%M:%S %z',
on_error => 'croak',
);

GetOptions (
'in-files=s{1,}' => \@logfiles,
'out-dir=s' => \$outdir,
Expand Down Expand Up @@ -83,6 +89,11 @@
my ($first_line_checksum, $last_read_line_checksum, $lines_read, $lines_valid) = (undef, undef, 0, 0);
open LOG, "<$log_file_path" or die "Could not open $log_file_path: $!";
my $prev_date='';
my $prev_time='';
my $aggr_data = {};
$aggr_data->{month}={};
my $first_datetime;
my $last_datetime;
while(my $line = <LOG>){
$lines_read++;
$line =~ s/\n$//;
Expand All @@ -102,38 +113,80 @@
}
}
next unless defined $service_id;
my ($act_date) = join('-',split('/',substr($time_local,0,11)));
unless($act_date eq $prev_date){
my $act_date = $time_local;
$act_date =~ s/\d{2}:\d{2}:\d{2} \+/00:00:00 \+/;
my $act_time = $time_local;
$act_time =~ s/:\d{2}:\d{2} \+/:00:00 \+/;

unless($act_date eq $prev_date){
unless($lines_valid){
close DUMP;
close DUMP_AGGR;
}
my $dump_file = "$file_name.$act_date.dump";
print_aggregated_data(\*DUMP_AGGR, $aggr_data,$prev_date,'day');
$aggr_data->{day}={};
my $date_filename = join('-',split('/',substr($act_date,0,11)));
my $dump_file = "$file_name.$date_filename.dump";
my $dump_aggr_file = "$file_name.$date_filename.aggr.dump";
push @print_sql,"
DO
\$\$
BEGIN
RAISE NOTICE '\%', NOW();
RAISE NOTICE '--------------[\%]----------------', NOW();
RAISE NOTICE 'starting importing from $dump_file';
RAISE NOTICE 'log file line: $lines_read';
END;
\$\$;
";
push @print_sql,"\\copy log_file_entries(file_id, service_id, line_number, line_checksum, remote_addr, remote_user, time_local, method, request, protocol, status, body_bytes_sent, http_referer, http_user_agent, unit) from '$dump_file'";
open DUMP, ">".File::Spec->catfile($outdir,$dump_file) or die "Could not open $dump_file: $!";

push @print_sql,"DO \$\$ BEGIN RAISE NOTICE 'IMPORTING HOUR AND DAY AGGR--------------[\%]----------------', NOW();END;\$\$;";
push @print_sql,"\\copy log_ip_aggr(period_start_date, period_end_date, period_level, ip, service_id, cnt_requests, cnt_units, cnt_body_bytes_sent) from '$dump_aggr_file'";
open DUMP_AGGR, ">".File::Spec->catfile($outdir,$dump_aggr_file) or die "Could not open $dump_aggr_file: $!";

$prev_date = $act_date;
$first_datetime = $time_local unless $first_datetime;
$last_datetime = $time_local;
}
unless($act_time eq $prev_time){
print_aggregated_data(\*DUMP_AGGR, $aggr_data, $prev_time,'hour');
$aggr_data->{hour}={};
$prev_time = $act_time;
}


$lines_valid++;
## print STDERR "$service_id: $request\n";
print DUMP join("\t", ($file_id,$service_id,$lines_valid,$last_read_line_checksum,$remote_addr,$remote_user,$time_local, $method, $request, $protocol, $status, $body_bytes_sent,$http_referer, $http_user_agent, $unit)),"\n";
for my $ip ($remote_addr, '\N') {
for my $service ($service_id, '\N'){
aggregate_data($aggr_data, $ip, $service, 1, $unit, $body_bytes_sent);
}
}
}
}

print_aggregated_data(\*DUMP_AGGR, $aggr_data,$prev_time,'hour');
print_aggregated_data(\*DUMP_AGGR, $aggr_data,$prev_date,'day');
close DUMP;
close DUMP_AGGR;
close LOG;

my $dump_aggr_file = "$file_name.aggr.dump";
open DUMP_AGGR, ">".File::Spec->catfile($outdir,$dump_aggr_file) or die "Could not open $dump_aggr_file: $!";
my $act_month = $prev_date; $act_month =~ s/^../01/;
print_aggregated_data(\*DUMP_AGGR, $aggr_data,$act_month,'month');
push @print_sql,"
DO \$\$
BEGIN RAISE NOTICE 'IMPORTING MONTH AGGR--------------[\%]----------------', NOW();
RAISE NOTICE 'importing month aggregations $act_month'; END; \$\$;";
push @print_sql,"\\copy log_ip_aggr(period_start_date, period_end_date, period_level, ip, service_id, cnt_requests, cnt_units, cnt_body_bytes_sent) from '$dump_aggr_file'";
close DUMP_AGGR;

open SQL, ">".File::Spec->catfile($outdir,$sql_file) or die "Could not open $sql_file: $!";
print SQL " -- $log_file sql dump
DO \$\$ BEGIN RAISE NOTICE 'STARTED--------------[\%]----------------', NOW(); END; \$\$;
ALTER TABLE log_file_entries DISABLE TRIGGER log_files_lines_read;
ALTER TABLE log_file_entries DISABLE TRIGGER log_files_lines_read_aggr;
Expand All @@ -147,9 +200,64 @@ BEGIN
print SQL "
ALTER TABLE log_file_entries ENABLE TRIGGER log_files_lines_read;
ALTER TABLE log_file_entries ENABLE TRIGGER log_files_lines_read_aggr;
DO \$\$ BEGIN RAISE NOTICE 'FINISHED--------------[\%]----------------', NOW(); END; \$\$;
";
print SQL "DO \$\$ BEGIN RAISE NOTICE 'TODO: run aggregations for::: SELECT * FROM log_files_entries WHERE file_id=$file_id'; END; \$\$; \n";

print STDERR "imported period: $first_datetime -- $last_datetime\n";

close SQL;

$first_datetime =~ s/:\d{2}:\d{2} \+/:00:00 \+/;
$last_datetime =~ s/:\d{2}:\d{2} \+/:00:00 \+/;
$last_datetime = $strp->parse_datetime($last_datetime)->add(hours => 1)->strftime('%d/%b/%Y:%H:%M:%S %z');
print STDERR "aggregated period: $first_datetime -- $last_datetime\n";

for my $bound (['lower', $first_datetime],['upper',$last_datetime]){
print STDERR "TESTING ",$bound->[0],": ",$bound->[1],"\n";
my $dt = $bound->[1];
for my $level (qw/hour day month/){
$sql="
SELECT count(1) AS cnt
FROM log_ip_aggr
WHERE
period_start_date <= '$dt'
AND period_end_date > '$dt'
AND period_level = '$level'::period_levels;
";
$sth = $dbi->prepare($sql);
$sth->execute;
if(my $result = $sth->fetchrow_hashref){
print STDERR " WARNING: ",$result->{cnt}," records cross boundary on $level level in existing database !!! \n\t-> IT PRODUCES DUPLICITIES WHEN IMPORT THIS\n" if $result->{cnt};
}
}
}
}


$dbi->disconnect();


sub print_aggregated_data{
my ($fh,$aggr_data, $start_time, $period) = @_;
return unless $start_time;
my $end_time = $strp->parse_datetime($start_time);
$end_time->add("${period}s" => 1);
$end_time = $end_time->strftime('%d/%b/%Y:%H:%M:%S %z');
for my $ip (keys %{$aggr_data->{$period}}) {
for my $service (keys %{$aggr_data->{$period}->{$ip}}){
print $fh join("\t",$start_time, $end_time, $period,$ip,$service,@{$aggr_data->{$period}->{$ip}->{$service}}),"\n";
}
}
}

$dbi->disconnect();
sub aggregate_data{
my ($data, $ip, $service, $request, $unit, $body_bytes_sent) = @_;
for my $period (qw/hour day month/){
$data->{$period}->{$ip} //={};
$data->{$period}->{$ip}->{$service} //= [0, 0, 0];
$data->{$period}->{$ip}->{$service}->[0] += 1; ## requests
$data->{$period}->{$ip}->{$service}->[1] += $unit; ## units
$data->{$period}->{$ip}->{$service}->[2] += $body_bytes_sent; ## body_bytes_sent
}
}

0 comments on commit b3891b4

Please sign in to comment.