diff --git a/Dockerfile b/Dockerfile index 156ede64..de3e493e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,6 +37,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libreadonly-xs-perl \ libroman-perl \ libtest-class-perl \ + libtest-exception-perl \ libtest-mockobject-perl \ libtest-most-perl \ libtest-spec-perl \ diff --git a/bin/expire_backups.pl b/bin/expire_backups.pl index 34591392..b1fede77 100644 --- a/bin/expire_backups.pl +++ b/bin/expire_backups.pl @@ -11,17 +11,31 @@ use HTFeed::BackupExpiration; my $dry_run = 0; # -d +my $job_size = 10000; # -j +my $limit = undef; # --limit my $storage_name = undef; # -s +my $workers = 1; my $help = 0; GetOptions( 'dry-run|d' => \$dry_run, + 'job-size|j=i' => \$job_size, + 'limit|l=i' => \$limit, 'storage|s=s' => \$storage_name, + 'workers|w=i' => \$workers, 'help|?' => \$help ) or pod2usage(2); pod2usage(1) if $help; -my $exp = HTFeed::BackupExpiration->new(storage_name => $storage_name, dry_run => $dry_run); +$workers = 1 if $workers < 1; + +my $exp = HTFeed::BackupExpiration->new( + dry_run => $dry_run, + job_size => $job_size, + limit => $limit, + max_workers => $workers, + storage_name => $storage_name, +); $exp->run(); __END__ @@ -32,8 +46,10 @@ =head1 NAME =head1 SYNOPSIS -expire_backups.pl [--dry-run] -s STORAGE_NAME - - STORAGE_NAME - storage class name matched against feed_backups.storage_name +expire_backups.pl [--dry-run] [--job-size JOB_SIZE] [--limit LIMIT] [--workers WORKER_COUNT] -s STORAGE_NAME + JOB_SIZE - number of objects to delete, per worker. Default 10,000. + LIMIT - maximum rows of feed_backups to fetch per run. Default is no SQL LIMIT. Can be 0. + STORAGE_NAME - storage class name matched against feed_backups.storage_name + WORKER_COUNT - maximum number of subprocesses to spawn =cut diff --git a/bin/expire_versions.pl b/bin/expire_versions.pl new file mode 100644 index 00000000..4a214eca --- /dev/null +++ b/bin/expire_versions.pl @@ -0,0 +1,58 @@ +#!/usr/bin/perl + +use warnings; +use strict; + +use FindBin; +use Getopt::Long qw(:config no_ignore_case); +use Pod::Usage; +use YAML::XS; + +use lib "$FindBin::Bin/../lib"; +use HTFeed::BackupExpirationBatch; +use HTFeed::Log { root_logger => 'INFO, screen' }; + +my $dry_run = 0; # -d +my $storage_config_file = undef; # -c +my $storage_name = undef; # -s +my $help = 0; + +GetOptions( + 'config|c=s' => \$storage_config_file, + 'dry-run|d' => \$dry_run, + 'storage|s=s' => \$storage_name, + 'help|?' => \$help +) or pod2usage(2); +pod2usage(1) if $help; + +if (scalar @ARGV != 1) { + die "path to job file is required"; +} + +my $storage_config = undef; +if ($storage_config_file) { + $storage_config = YAML::XS::LoadFile($storage_config_file); +} + +my $exp = HTFeed::BackupExpirationBatch->new( + dry_run => $dry_run, + job_file => $ARGV[0], + storage_config => $storage_config, + storage_name => $storage_name +); +$exp->run(); + +__END__ + +=head1 NAME + + expire_versions.pl - remove a batch of superseded material from backup storage. + +=head1 SYNOPSIS + +expire_versions.pl [--config STORAGE_CONFIG_FILE] [--dry-run] -s STORAGE_NAME PATH_TO_JOB_FILE + + STORAGE_CONFIG_FILE - path to a YAML file with config hash for STORAGE_NAME + STORAGE_NAME - storage class name matched against feed_backups.storage_name + PATH_TO_JOB_FILE - path to a TSV file with format namespaceidversion +=cut diff --git a/lib/HTFeed/BackupExpiration.pm b/lib/HTFeed/BackupExpiration.pm index 05611840..a37657e7 100644 --- a/lib/HTFeed/BackupExpiration.pm +++ b/lib/HTFeed/BackupExpiration.pm @@ -3,28 +3,69 @@ package HTFeed::BackupExpiration; use strict; use warnings; + use HTFeed::Config qw(get_config); use HTFeed::DBTools qw(get_dbh); use HTFeed::Volume; -use Carp; + +use Carp (); +use File::Spec (); +use File::Temp (); use Log::Log4perl qw(get_logger); -use File::Temp; +use YAML::XS (); + +my $select_expired_sql = <<~'SQL'; + SELECT namespace,id + FROM feed_backups + WHERE deleted IS NULL + AND storage_name=? + AND version < DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 180 DAY),"%Y%m%d%H%i%S") + GROUP BY namespace,id + HAVING COUNT(*) > 1 +SQL + +my $select_versions_sql = <<~'SQL'; + SELECT version + FROM feed_backups + WHERE deleted IS NULL + AND storage_name=? + AND namespace=? + AND id=? + AND version < DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 180 DAY),"%Y%m%d%H%i%S") + ORDER BY version DESC +SQL -use HTFeed::Storage::PrefixedVersions; -use HTFeed::Storage::ObjectStore; sub new { my $class = shift; my $self = { storage_name => undef, + custom_storage_config => 0, + max_workers => 8, + job_size => 10000, + limit => undef, @_ }; unless ($self->{storage_name}) { - croak "$class cannot be constructed without a storage name"; + Carp::croak "$class cannot be constructed without a storage name"; + } + + # Test can init with `storage_config` because it is transient and must be known to workers. + # Production just reads the config as normal + if ($self->{storage_config}) { + $self->{custom_storage_config} = 1; + } else { + my $config = get_config('storage_classes'); + my $storage_config = $config->{$self->{storage_name}}; + die("Can't find storage configuration for " . $self->{storage_name}) unless $storage_config; + $self->{storage_config} = $storage_config; } + $self->{temp_directory} = File::Temp->newdir; + $self->{workers} = {}; + bless($self, $class); return $self; } @@ -32,74 +73,118 @@ sub new { sub run { my $self = shift; - my $dry_run = $self->{dry_run}; - my $dry_run_text = ""; - $dry_run_text = " (DRY RUN)" if $dry_run; - - my $config = get_config('storage_classes'); - my $storage_config = $config->{$self->{storage_name}}; - die("Can't find storage configuration for " . $self->{storage_name}) unless $storage_config; - - # find everything with more than one version that is at least 6 months old - # delete all but the most recent > 6 months old version - my $sth = get_dbh()->prepare(<<'SQL'); - SELECT namespace,id - FROM feed_backups - WHERE deleted IS NULL - AND storage_name=? - AND version < DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 180 DAY),"%Y%m%d%H%i%S") - GROUP BY namespace,id - HAVING COUNT(*) > 1 -SQL - - my $versions_sth = get_dbh()->prepare(<<'SQL'); - SELECT version - FROM feed_backups - WHERE deleted IS NULL - AND storage_name=? - AND namespace=? - AND id=? - AND version < DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 180 DAY),"%Y%m%d%H%i%S") - ORDER BY version DESC -SQL + # Write storage config to the temp directory for child processes to get at it. + # Unnecessary for production, needed for testing because it is generated as part + # of the test suite. + if ($self->{custom_storage_config}) { + $self->{storage_config_file} = File::Spec->catfile($self->{temp_directory}, 'storage_config.yml'); + my $yaml = YAML::XS::Dump($self->{storage_config}); + open(my $fh, '>', $self->{storage_config_file}) or die "Could not open storage config YAML $!"; + print $fh $yaml; + close $fh; + } - my $update_sth = get_dbh()->prepare(<<'SQL'); - UPDATE feed_backups SET deleted=1 - WHERE namespace=? - AND id=? - AND version=? - AND storage_name=? -SQL + my $sth = get_dbh()->prepare($self->select_expired_sql); + my $versions_sth = get_dbh()->prepare($select_versions_sql); + my $job = []; + # Iterate over the entirety of feed_backups + # Reaching the end and restarting the query must take place at a + # higher level, perhaps with $self->run called repeatedly. $sth->execute($self->{storage_name}); while (my $row = $sth->fetchrow_hashref) { $versions_sth->execute($self->{storage_name}, $row->{namespace}, $row->{id}); my @versions = map { $_->[0]; } @{$versions_sth->fetchall_arrayref}; shift @versions; # jettison the most recent foreach my $version (@versions) { - my $volume = new HTFeed::Volume(namespace => $row->{namespace}, - objid => $row->{id}, - package_type => 'ht'); - my $storage = $storage_config->{class}->new(volume => $volume, - config => $storage_config, - name => $self->{storage_name}); - unless (defined $storage) { - die "Unable to get storage for $volume->{namespace}.$volume->{objid}"; + push(@$job, [$row->{namespace}, $row->{id}, $version]); + # Do we have enough to spawn a worker? + if (scalar @$job >= $self->{job_size}) { + $self->wait_for_available_worker; + $self->spawn_worker($job); + $job = []; } - $storage->{timestamp} = $version; - $storage->{zip_suffix} = '.gpg'; - get_logger->trace("deleting archive for $volume->{namespace}.$volume->{objid} version $version" . $dry_run_text); - next if $dry_run; - unless ($storage->delete_objects) { - die "Unable to delete $volume->{namespace}.$volume->{objid}"; + } + } + # Submit the leftovers if any + if (scalar @$job > 0) { + $self->wait_for_available_worker; + $self->spawn_worker($job); + $job = []; + } + # Set max workers to 0 so we wait for all of them to finish. + $self->{max_workers} = 0; + # Wait for all the workers to finish. + while (scalar keys %{$self->{workers}} > 0) { + $self->wait_for_available_worker; + } +} + +# waitpid on existing workers (if any) until one finishes up +# but only if we are at maximum capacity. +# Only waits for workers to finish if we already have the maximum number on the go, +# or if we are finished and have set the maximum to 0. +sub wait_for_available_worker { + my $self = shift; + + if (scalar keys %{$self->{workers}} >= $self->{max_workers}) { + my $pid = 0; + do { + # Wait for any worker. This blocks indefinitely but there's nothing else + # for this process to do but wait. + $pid = waitpid(-1, 0); + if ($pid > 0) { + my $job_file = $self->{workers}->{$pid}; + get_logger->trace("worker [$pid] exited with status $? - removing $job_file"); + unlink $job_file->filename; + delete $self->{workers}->{$pid}; } - get_logger->trace("setting deleted=1 for $volume->{namespace}.$volume->{objid} version $version"); - $update_sth->execute($row->{namespace}, $row->{id}, - $version, $self->{storage_name}); + } while ($pid > 0); + } +} + +sub spawn_worker { + my $self = shift; + my $job = shift; + + my $job_file = File::Temp->new( + DIR => $self->{temp_directory}, + SUFFIX => '.tsv', + CLEANUP => 0 + ); + foreach my $version (@$job) { + print $job_file join("\t", @$version) . "\n"; + } + $job_file->close; + my $pid = fork(); + if (!defined $pid) { + die "Fork failed: $!"; + } elsif ($pid == 0) { + # WORKER PROCESS + my $worker_script = File::Spec->catfile($ENV{FEED_HOME}, 'bin', 'expire_versions.pl'); + my @cmd = ('perl', $worker_script, '-s', $self->{storage_name}); + if ($self->{custom_storage_config}) { + push @cmd, '--config', $self->{storage_config_file}; + } + if ($self->{dry_run}) { + push @cmd, '--dry-run'; } + push @cmd, $job_file->filename; + exec(@cmd) or die "worker [$$] exec failed to run: $!\n"; + } else { + # PARENT PROCESS + get_logger->trace("worker [$pid] started with $job_file (" . scalar(@$job) . " items)"); + $self->{workers}->{$pid} = $job_file; } } +sub select_expired_sql { + my $self = shift; + + my $limit_clause = (defined $self->{limit}) ? " LIMIT $self->{limit}" : ''; + return $select_expired_sql . $limit_clause; +} + 1; __END__ diff --git a/lib/HTFeed/BackupExpirationBatch.pm b/lib/HTFeed/BackupExpirationBatch.pm new file mode 100644 index 00000000..7c078fb9 --- /dev/null +++ b/lib/HTFeed/BackupExpirationBatch.pm @@ -0,0 +1,109 @@ +#!/usr/bin/perl +package HTFeed::BackupExpirationBatch; + +use strict; +use warnings; + +use Carp (); +use Data::Dumper; + +use HTFeed::Config qw(get_config); +use HTFeed::DBTools qw(get_dbh); +use HTFeed::Volume; + +use Log::Log4perl qw(get_logger); + +use HTFeed::Storage::PrefixedVersions; +use HTFeed::Storage::ObjectStore; + +my $update_sql = <<~'SQL'; + UPDATE feed_backups + SET deleted=1 + WHERE namespace=? + AND id=? + AND version=? + AND storage_name=? +SQL + + +sub new { + my $class = shift; + + my $self = { + storage_name => undef, + dry_run_text => '', + @_ + }; + + unless ($self->{storage_name}) { + Carp::croak "$class cannot be constructed without a storage name"; + } + + unless ($self->{job_file}) { + Carp::croak "$class cannot be constructed without a path to a job file"; + } + + unless (-e $self->{job_file}) { + Carp::croak "job file does not exist: $self->{job_file}"; + } + + # Test can init with `storage_config` because it is transient and must be known to workers. + # Production just reads the config as normal + if (!$self->{storage_config}) { + my $config = get_config('storage_classes'); + my $storage_config = $config->{$self->{storage_name}}; + die("Can't find storage configuration for " . $self->{storage_name}) unless $storage_config; + $self->{storage_config} = $storage_config; + } + $self->{dry_run_text} = ' (DRY RUN)' if $self->{dry_run}; + $self->{update_sth} = get_dbh()->prepare($update_sql); + + bless($self, $class); + return $self; +} + +sub run { + my $self = shift; + + open(my $fh, '<:encoding(UTF-8)', $self->{job_file}) or die "could not open $$self->{job_file}: $!"; + while (my $line = <$fh>) { + chomp $line; + my ($namespace, $id, $version) = split(/\t/, $line, 3); + $self->delete_version($namespace, $id, $version); + } +} + +sub delete_version { + my $self = shift; + my $namespace = shift; + my $id = shift; + my $version = shift; + + my $volume = new HTFeed::Volume( + namespace => $namespace, + objid => $id, + package_type => 'ht' + ); + my $storage = $self->{storage_config}->{class}->new( + volume => $volume, + config => $self->{storage_config}, + name => $self->{storage_name} + ); + unless (defined $storage) { + die "Unable to get storage for $volume->{namespace}.$volume->{objid}"; + } + $storage->{timestamp} = $version; + $storage->{zip_suffix} = '.gpg'; + get_logger->trace("deleting archive for $volume->{namespace}.$volume->{objid} version $version" . $self->{dry_run_text}); + return if $self->{dry_run}; + + unless ($storage->delete_objects) { + die "Unable to delete $volume->{namespace}.$volume->{objid} version $version"; + } + get_logger->trace("setting deleted=1 for $volume->{namespace}.$volume->{objid} version $version"); + $self->{update_sth}->execute($namespace, $id, $version, $self->{storage_name}); +} + +1; + +__END__ diff --git a/t/backup_expiration.t b/t/backup_expiration.t index 709d9891..860ab8cb 100644 --- a/t/backup_expiration.t +++ b/t/backup_expiration.t @@ -1,3 +1,4 @@ +use Test::Exception; use Test::Spec; use HTFeed::BackupExpiration; @@ -112,20 +113,48 @@ describe "HTFeed::BackupExpiration" => sub { } share my %vars; + + it "throws exception if storage config can't be found" => sub { + throws_ok { + HTFeed::BackupExpiration->new(storage_name => 'no-such-storage'); + } "/can't find storage configuration/i"; + }; + shared_examples_for "all storages" => sub { it "should create expiration object" => sub { - my $exp = HTFeed::BackupExpiration->new(storage_name => $vars{storage_name}, dry_run => 0); + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 0 + ); ok($exp, 'new returns a value'); is($exp->{storage_name}, $vars{storage_name}, 'expiration has correct storage name'); + is_deeply($exp->{storage_config}, $vars{storage_config}, 'expiration has correct storage config'); + }; + + it "uses `limit` parameter to restrict the main database query" => sub { + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 0, + limit => 999 + ); + ok($exp->select_expired_sql =~ /LIMIT \d+$/, 'main query ends with LIMIT N'); }; it "deletes nothing when all versions are < 6 months old" => sub { my @versions; + my $new_timestamp = new_random_timestamp(); foreach my $n (1 .. 3) { - my $storage = prepare_storage($vars{storage_name}, new_random_timestamp()); + my $storage = prepare_storage($vars{storage_name}, $new_timestamp); push @versions, $storage; + $new_timestamp += 10; } - my $exp = HTFeed::BackupExpiration->new(storage_name => $vars{storage_name}, dry_run => 0); + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 0 + ); $exp->run(); my $deleted = count_deleted_objects('test', 'test'); is($deleted, 0, 'no objects deleted'); @@ -138,7 +167,11 @@ describe "HTFeed::BackupExpiration" => sub { it "deletes nothing with a single old version and single new one" => sub { my $old_storage = prepare_storage($vars{storage_name}, old_random_timestamp()); my $new_storage = prepare_storage($vars{storage_name}, new_random_timestamp()); - my $exp = HTFeed::BackupExpiration->new(storage_name => $vars{storage_name}, dry_run => 0); + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 0 + ); $exp->run(); my $deleted = count_deleted_objects('test', 'test'); is($deleted, 0, 'no objects deleted'); @@ -150,13 +183,19 @@ describe "HTFeed::BackupExpiration" => sub { it "deletes the oldest old versions when there are no new ones" => sub { my @old_versions; + my $old_timestamp = old_random_timestamp(); foreach my $n (1 .. 3) { - my $storage = prepare_storage($vars{storage_name}, old_random_timestamp()); + my $storage = prepare_storage($vars{storage_name}, $old_timestamp); push @old_versions, $storage; + $old_timestamp -= 10; } @old_versions = sort { $a->{timestamp} cmp $b->{timestamp}; } @old_versions; my $keep = pop @old_versions; - my $exp = HTFeed::BackupExpiration->new(storage_name => $vars{storage_name}, dry_run => 0); + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 0 + ); $exp->run(); foreach my $storage (@old_versions) { my $deleted = count_deleted_objects('test', 'test', $storage->{timestamp}); @@ -172,18 +211,26 @@ describe "HTFeed::BackupExpiration" => sub { it "deletes the oldest old versions when there are new ones" => sub { my @old_versions; + my $old_timestamp = old_random_timestamp(); foreach my $n (1 .. 3) { - my $storage = prepare_storage($vars{storage_name}, old_random_timestamp()); + my $storage = prepare_storage($vars{storage_name}, $old_timestamp); push @old_versions, $storage; + $old_timestamp -= 10; } @old_versions = sort { $a->{timestamp} cmp $b->{timestamp}; } @old_versions; my $keep = pop @old_versions; my @new_versions; + my $new_timestamp = new_random_timestamp(); foreach my $n (1 .. 3) { - my $storage = prepare_storage($vars{storage_name}, new_random_timestamp()); + my $storage = prepare_storage($vars{storage_name}, $new_timestamp); push @new_versions, $storage; + $new_timestamp += 10; } - my $exp = HTFeed::BackupExpiration->new(storage_name => $vars{storage_name}, dry_run => 0); + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 0 + ); $exp->run(); foreach my $storage (@old_versions) { my $deleted = count_deleted_objects('test', 'test', $storage->{timestamp}); @@ -205,11 +252,17 @@ describe "HTFeed::BackupExpiration" => sub { it "deletes nothing when there are old versions but is given the dry run flag" => sub { my @versions; + my $old_timestamp = old_random_timestamp(); foreach my $n (1 .. 3) { - my $storage = prepare_storage($vars{storage_name}, old_random_timestamp()); + my $storage = prepare_storage($vars{storage_name}, $old_timestamp ); push @versions, $storage; + $old_timestamp -= 10; } - my $exp = HTFeed::BackupExpiration->new(storage_name => $vars{storage_name}, dry_run => 1); + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 1 + ); $exp->run(); my $deleted = count_deleted_objects('test', 'test'); is($deleted, 0, 'no objects deleted'); @@ -218,11 +271,41 @@ describe "HTFeed::BackupExpiration" => sub { ok(!zip_deleted($storage), "new ($storage->{timestamp}) zip left intact"); } }; + + it "spawns multiple workers when job size is small" => sub { + my @old_versions; + my $old_timestamp = old_random_timestamp(); + foreach my $n (1 .. 10) { + my $storage = prepare_storage($vars{storage_name}, --$old_timestamp); + push @old_versions, $storage; + $old_timestamp -= 10; + } + @old_versions = sort { $a->{timestamp} cmp $b->{timestamp}; } @old_versions; + my $keep = pop @old_versions; + my $exp = HTFeed::BackupExpiration->new( + storage_name => $vars{storage_name}, + storage_config => $vars{storage_config}, + dry_run => 0, + job_size => 1 + ); + $exp->run(); + foreach my $storage (@old_versions) { + my $deleted = count_deleted_objects('test', 'test', $storage->{timestamp}); + is($deleted, 1, 'old object is marked feed_backups.deleted'); + ok(mets_deleted($storage), "old ($storage->{timestamp}) mets deleted"); + ok(zip_deleted($storage), "old ($storage->{timestamp}) zip deleted"); + } + my $deleted = count_deleted_objects('test', 'test', $keep->{timestamp}); + is($deleted, 0, 'newer object is not marked feed_backups.deleted'); + ok(!mets_deleted($keep), "newer ($keep->{timestamp}) mets left intact"); + ok(!zip_deleted($keep), "newer ($keep->{timestamp}) zip left intact"); + }; }; describe "HTFeed::BackupExpiration for PrefixedVersions" => sub { before each => sub { $vars{storage_name} = 'prefixedversions-test'; + $vars{storage_config} = get_config('storage_classes')->{$vars{storage_name}}; }; it_should_behave_like "all storages"; @@ -231,6 +314,7 @@ describe "HTFeed::BackupExpiration" => sub { describe "HTFeed::BackupExpiration for ObjectStore" => sub { before each => sub { $vars{storage_name} = 'objectstore-test'; + $vars{storage_config} = get_config('storage_classes')->{$vars{storage_name}}; }; it_should_behave_like "all storages";