File: //usr/local/share/perl5/Cpanel/TaskQueue/Scheduler.pm
package Cpanel::TaskQueue::Scheduler;
# cpanel - Cpanel/TaskQueue/Scheduler.pm Copyright(c) 2010 cPanel, Inc.
# All rights Reserved.
# copyright@cpanel.net http://cpanel.net
# This code is subject to the cPanel license. Unauthorized copying is prohibited
#
# This module handles queuing of tasks for execution. The queue is persistent
# handles consolidating of duplicate tasks.
use strict;
#use warnings;
use YAML::Syck (); # Data Serialization
use Cpanel::TaskQueue ();
use Cpanel::TaskQueue::Task();
use Cpanel::CacheFile ();
our $VERSION = 0.307;
# -----------------------------------------------------------------------------
# Policy code: The following allows is a little weird because its intent is to
# change the policy by which some code is executed, without adding a gratuitous
# object and polymorphism into the mix.
#
# I had originally redefined the methods, but that seems a little too magical
# when indirecting through goto works as well (if a little slower).
# These methods are intended to help document the importance of the message and
# to supply 'seam' that could be used to modify the logging behavior of the
# CacheFile.
my $are_policies_set = 0;
my $pkg = __PACKAGE__;
#
# This method allows changing the policies for logging and locking.
sub import {
my $class = shift;
die "Not an even number of arguments to the $pkg module\n" if @_ % 2;
die "Policies already set elsewhere\n" if $are_policies_set;
return 1 unless @_; # Don't set the policies flag.
while (@_) {
my ( $policy, $module ) = splice( @_, 0, 2 );
my @methods = ();
if ( '-logger' eq $policy ) {
Cpanel::CacheFile->import( '-logger' => $module );
}
else {
die "Unrecognized policy '$policy'\n";
}
}
$are_policies_set = 1;
return 1;
}
# Replacement for List::Util::first, so I don't need to bring in the whole module.
sub _first (&@) { ## no critic(ProhibitSubroutinePrototypes)
my $pred = shift;
local $_;
foreach (@_) {
return $_ if $pred->();
}
return;
}
# Namespace value used when creating unique task ids.
my $tasksched_uuid = "TaskQueue-Scheduler";
{
my $FILETYPE = 'TaskScheduler'; # Identifier at the beginning of the cache file
my $CACHE_VERSION = 2; # Cache file version number.
# Disk Cache & cache file.
#
sub get_name { $_[0]->{scheduler_name}; }
# --------------------------------------
# Class methods
# Initialize parameters.
sub new {
my ( $class, $args_ref ) = @_;
my $self = bless {
next_id => 1,
time_queue => [],
disk_cache => undef,
}, $class;
if ( exists $args_ref->{token} ) {
my ( $version, $name, $file ) = split( ':\|:', $args_ref->{token} );
# have all parts
Cpanel::CacheFile->_throw('Invalid token.')
unless defined $version
and defined $name
and defined $file;
# all parts make sense.
Cpanel::CacheFile->_throw('Invalid token.')
unless 'tqsched1' eq $version and $file =~ m{/\Q$name\E_sched\.yaml$};
$self->{scheduler_name} = $name;
$self->{disk_cache_file} = $file;
}
else {
Cpanel::CacheFile->_throw('No caching directory supplied.') unless exists $args_ref->{cache_dir};
Cpanel::CacheFile->_throw('No scheduler name supplied.') unless exists $args_ref->{name};
$self->{disk_cache_file} = "$args_ref->{cache_dir}/$args_ref->{name}_sched.yaml";
$self->{scheduler_name} = $args_ref->{name};
}
# Make a disk file to track the object.
my $cache_args = {
cache_file => $self->{disk_cache_file}, data_obj => $self,
exists $args_ref->{cache_timeout} ? ( timeout => $args_ref->{cache_timeout} ) : (),
exists $args_ref->{logger} ? ( logger => $args_ref->{logger} ) : (),
};
eval { $self->{disk_cache} = Cpanel::CacheFile->new($cache_args); };
if ($@) {
my $ex = $@;
# If not a loading error, rethrow.
Cpanel::CacheFile->_throw($ex) unless $ex =~ /Not a recognized|Invalid version/;
Cpanel::CacheFile->_warn($ex);
Cpanel::CacheFile->_warn("Moving bad cache file and retry.\n");
Cpanel::CacheFile->_notify(
'Unable to load TaskQueue::Scheduler metadata',
"Loading of [$self->{disk_cache_file}] failed: $ex\n" . "Moving bad file to [$self->{disk_cache_file}.broken] and retrying.\n"
);
unlink "$self->{disk_cache_file}.broken";
rename $self->{disk_cache_file}, "$self->{disk_cache_file}.broken";
$self->{disk_cache} = Cpanel::CacheFile->new($cache_args);
}
return $self;
}
sub throw {
my $self = shift;
return $self->{disk_cache} ? $self->{disk_cache}->throw(@_) : Cpanel::CacheFile->_throw(@_);
}
# Not using warn, so don't define it.
sub info {
my $self = shift;
return $self->{disk_cache} ? $self->{disk_cache}->info(@_) : undef;
}
# -------------------------------------------------------
# Public methods
sub load_from_cache {
my ( $self, $fh ) = @_;
local $/;
my ( $magic, $version, $meta ) = YAML::Syck::Load( scalar <$fh> );
$self->throw("Not a recognized TaskQueue Scheduler cache.\n") unless $magic eq $FILETYPE;
$self->throw("Invalid version of TaskQueue Scheduler cache.\n") unless $version eq $CACHE_VERSION;
# Next id should continue increasing.
# (We might want to deal with wrap-around at some point.)
$self->{next_id} = $meta->{nextid} if $meta->{nextid} > $self->{next_id};
# Clean queues that have been read from disk.
$self->{time_queue} = [ grep { _is_item_sane($_) } @{ $meta->{waiting_queue} } ];
return 1;
}
sub save_to_cache {
my ( $self, $fh ) = @_;
my $meta = {
nextid => $self->{next_id},
waiting_queue => $self->{time_queue},
};
return print $fh YAML::Syck::Dump( $FILETYPE, $CACHE_VERSION, $meta );
}
sub schedule_task {
my ( $self, $command, $args ) = @_;
$self->throw('Cannot queue an empty command.') unless defined $command;
$self->throw('Args is not a hash ref.') unless defined $args and 'HASH' eq ref $args;
my $time = time;
$time += $args->{delay_seconds} if exists $args->{delay_seconds};
$time = $args->{at_time} if exists $args->{at_time};
if ( eval { $command->isa('Cpanel::TaskQueue::Task') } ) {
if ( 0 == $command->retries_remaining() ) {
$self->info('Task with 0 retries not scheduled.');
return;
}
return $self->_schedule_the_task( $time, $command );
}
# must have non-space characters to be a command.
$self->throw('Cannot queue an empty command.') unless $command =~ /\S/;
my @retry_attrs = ();
if ( exists $args->{attempts} ) {
return unless $args->{attempts} > 0;
@retry_attrs = (
retries => $args->{attempts},
userdata => { sched => $self->get_token() }
);
}
my $task = Cpanel::TaskQueue::Task->new(
{
cmd => $command, nsid => $tasksched_uuid, id => $self->{next_id}++,
@retry_attrs
}
);
return $self->_schedule_the_task( $time, $task );
}
sub unschedule_task {
my ( $self, $uuid ) = @_;
unless ( _is_valid_uuid($uuid) ) {
$self->throw('No Task uuid argument passed to unschedule_task.');
}
# Lock the queue before we begin accessing it.
my $guard = $self->{disk_cache}->synch();
my $old_count = @{ $self->{time_queue} };
$self->{time_queue} = [ grep { $_->{task}->uuid() ne $uuid } @{ $self->{time_queue} } ];
# All changes complete, save to disk.
$guard->update_file();
return $old_count > @{ $self->{time_queue} };
}
sub is_task_scheduled {
my ( $self, $uuid ) = @_;
unless ( _is_valid_uuid($uuid) ) {
$self->throw('No Task uuid argument passed to is_task_scheduled.');
}
# Update from disk, but don't worry about lock. Information only.
$self->{disk_cache}->synch();
return _first { $_->{task}->uuid() eq $uuid } @{ $self->{time_queue} };
}
sub when_is_task_scheduled {
my ( $self, $uuid ) = @_;
unless ( _is_valid_uuid($uuid) ) {
$self->throw('No Task uuid argument passed to when_is_task_scheduled.');
}
# Update from disk, but don't worry about lock. Information only.
$self->{disk_cache}->synch();
my $task = _first { $_->{task}->uuid() eq $uuid } @{ $self->{time_queue} };
return unless defined $task;
return $task->{time};
}
sub how_many_scheduled {
my ($self) = @_;
# Update from disk, but don't worry about lock. Information only.
$self->{disk_cache}->synch();
return scalar @{ $self->{time_queue} };
}
sub peek_next_task {
my ($self) = @_;
# Update from disk, but don't worry about lock. Information only.
$self->{disk_cache}->synch();
return unless @{ $self->{time_queue} };
return $self->{time_queue}->[0]->{task}->clone();
}
sub seconds_until_next_task {
my ($self) = @_;
# Update from disk, but don't worry about lock. Information only.
$self->{disk_cache}->synch();
return unless @{ $self->{time_queue} };
return $self->{time_queue}->[0]->{time} - time;
}
sub process_ready_tasks {
my ( $self, $queue ) = @_;
unless ( defined $queue and eval { $queue->can('queue_task') } ) {
$self->throw('No valid queue supplied.');
}
# Don't generate lock yet, we may not need one.
$self->{disk_cache}->synch();
my $count = 0;
my $guard;
eval {
while ( @{ $self->{time_queue} } ) {
my $item = $self->{time_queue}->[0];
last if time < $item->{time};
if ( !$guard ) {
# Now we know we'll be changing the schedule, so we need to
# lock it.
$guard ||= $self->{disk_cache}->synch();
next;
}
# Should be safe from deadlock unless queue calls back to me.
$queue->queue_task( $item->{task} );
++$count;
# Only remove from the schedule when the queue has processed it.
shift @{ $self->{time_queue} };
}
};
my $ex = $@;
$guard->update_file() if $count && $guard;
die $ex if $ex;
return $count;
}
sub get_token {
my ( $self, $command, $time ) = @_;
return join( ':|:', 'tqsched1', $self->{scheduler_name}, $self->{disk_cache_file} );
}
sub snapshot_task_schedule {
my ($self) = @_;
$self->{disk_cache}->synch();
return [
map {
{ time => $_->{time}, task => $_->{task}->clone() }
} @{ $self->{time_queue} }
];
}
# ---------------------------------------------------------------
# Private Methods.
sub _schedule_the_task {
my ( $self, $time, $task ) = @_;
my $guard = $self->{disk_cache}->synch();
my $item = { time => $time, task => $task };
# if the list is empty, or time after all in list.
if ( !@{ $self->{time_queue} } or $time >= $self->{time_queue}->[-1]->{time} ) {
push @{ $self->{time_queue} }, $item;
}
elsif ( $time < $self->{time_queue}->[0]->{time} ) {
# schedule before anything in the list
unshift @{ $self->{time_queue} }, $item;
}
else {
# find the correct spot in the list.
foreach my $i ( 1 .. $#{ $self->{time_queue} } ) {
next unless $self->{time_queue}->[$i]->{time} > $time;
splice( @{ $self->{time_queue} }, $i, 0, $item );
last;
}
}
$guard->update_file();
return $task->uuid();
}
sub _is_item_sane {
my ($item) = @_;
return unless 'HASH' eq ref $item;
return unless exists $item->{task} and exists $item->{time};
return unless eval { $item->{task}->isa('Cpanel::TaskQueue::Task') };
return $item->{time} =~ /^\d+$/;
}
sub _is_valid_uuid {
return Cpanel::TaskQueue::Task::is_valid_taskid(shift);
}
}
1; # Magic true value required at the end of the module