# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: SpoolManager.pm 3 2007-10-18 16:20:19Z guillaume $ # package A2P::SpoolManager; # Derived class from Thread.pm use base qw(A2P::Thread); use strict; use Fcntl; use Fcntl qw( :DEFAULT ); use Time::HiRes qw( usleep gettimeofday ); use A2P::Globals; use A2P::Syslog; use A2P::Tools qw( FcntlLocked WaitFcntlLocked FcntlUnlocked); use A2P::Com qw( IsCom comJOB ); use A2P::JobStatus 'a2pjobstate' ; BEGIN { our $VERSION = sprintf "%s", q$Rev: 415 $ =~ /(\d[0-9.]+)\s+/ ; } our $VERSION ; #+--------------------------------------------------------------------------+ #| Private helpers for file locking | #+--------------------------------------------------------------------------+ sub get_locked (\$) { &TIMESTAT('LOCKING-FILE'); my $self = shift ; my $ref = shift ; my $ret = 1 ; my $LCK ; my $prefix = &prefix($$ref); my $filename = $AFPSPOOL . '/' . $$ref ; $filename =~ s$//$/$g ; # Clean filename # First lock on prefix unless ($self->lockprefix($prefix)) { &Debug("prefix '$prefix' not locked"); &TIMESTAT('LOCKING-FILE'); return 0 ; } unless ( -e $filename ) { &Debug("'$filename' not found"); $self->unlockprefix($prefix); &TIMESTAT('LOCKING-FILE'); return 0 ; } my $lockfile = $AFPSPOOL . "/.a2p/" . $$ref . ".LCK" ; sysopen( $LCK , "$lockfile" , O_WRONLY | O_CREAT ) or $ret = &Error("Can't open '$lockfile' for writing: $!"); if ( $ret ) { &Debug("Trying to get a lock on '$lockfile'"); if ( $ret and &FcntlLocked($LCK)) { truncate($LCK,0); my $newname = $filename ; # Replace "prefix." string by LOCKID folder name $newname =~ s|/$prefix\.|/$LOCKID/| ; &Debug("Renaming '$filename' to '$newname'"); unless (rename $filename , $newname) { &Debug("Can't rename '$$ref': $!"); $ret = 0 ; } if ( $ret and -e $newname ) { $newname =~ s,$AFPSPOOL/*,, ; # Keep only filename $$ref = $newname ; &Debug("'$newname' locked"); } else { &Debug("'$newname' not available"); $ret = 0 ; } } else { &Debug("Can't lock '$lockfile': $!"); $ret = 0 ; } # Remove lock file close($LCK); unlink $lockfile ; } unless ($self->unlockprefix($prefix)) { &Warn("Can't unlock prefix on $prefix before locking on $filename: $!"); &Info("$prefix file must be deleted manually " . "after all concerned services has been stopped"); } &TIMESTAT('LOCKING-FILE'); return $ret ; } sub lockname { my $self = shift ; my $LCK ; my $lock = $AFPSPOOL . '/' . $_[0] ; &Debug("'$_[0]' seems still locked") if ( -s $lock . '.LCK' ); sysopen( $LCK , $lock . ".LCK" , O_WRONLY | O_CREAT ) or &Error("Can't open '$lock.LCK' for writing: $!"), return ""; if(!&FcntlLocked($LCK)) { &Error("Can't lock '$lock.LCK': $!"); return ""; } truncate $LCK , 0 ; # Print our PID in Spool Lock File print $LCK $$ ; $self->{LOCKEDSPOOL} = $LCK ; # Create folder where will be moved files if needed if ( ! -d $lock ) { mkdir $lock , 0775 or &Error("Can't create '$lock' folder: $!"), return ""; } return $lock ; } my @prefix = () ; sub prefix { my $file = shift ; map { return $1 if $file =~ /^($_)\..+/i } @prefix ; return "" ; } sub update_prefix { # Update prefix array as optimization and SPOOL_PREFIX can has been updated return @prefix = split( /\s+/, $_[0] ) ; } ################################################################################ ## Afpspool file handling code ## ################################################################################ sub Do { my $self = shift ; my $ref = shift ; my ( $count , $lastwasmax ) = ( 0 , 1 ); my $file = "" ; my $AfpName = "" ; my @Job = &IsCom( comJOB , $$ref ); # Default request is give me a filename for that job if ( @Job ) { if ( $Job[1] == DONE ) { # Reset last kept information to skip annoying alert before quitting # The JobManager is saying it is managing the file, so it will keep # trace of it even if it should quit or abort ( $self->{LASTJOB} , $self->{LASTFILE} ) = ( $Job[0] , NOMOREFILE ); $self->AnswerDone( $Job[0] ); return 1 ; } # Keep internal stats &UPSTAT('SPOOL_CHECK'); # Locking AFPSPOOL for our LOCKID if (!defined($self->{LockedSpool}->{$AFPSPOOL})) { # Unlock old spool, needed if AFPSPOOL is updated $self->DoBeforeQuit(); # LOCKID will also be a folder in AFPSPOOL, see &get_locked $self->{LockedSpool}->{$AFPSPOOL} = $self->lockname("$LOCKID") ; if ( $self->{LockedSpool}->{$AFPSPOOL} ) { &Debug("'$AFPSPOOL' locked with '$LOCKID' lockid"); } else { $self->ThisError("Can't lock '$AFPSPOOL' folder on '$LOCKID'"); &Alert("Quitting as AFPSPOOL is not available," . " maybe LOCKID is still allocated to another service"); # Tell there's nothing we can do $self->Return( $Job[0] , 1 ); delete $self->{LockedSpool}->{$AFPSPOOL}; $MUSTQUIT ++ ; # Fatal error thread should quit } } # Update prefix array as optimization and needed when SPOOL_PREFIX # has been updated &update_prefix($SPOOL_PREFIX); &Debug("Finding a file for job $Job[0]"); while (length($file) == 0) { $count = scalar( @{$self->{AfpFiles}} ); # Update the cache if needed if ( $count == 0 or ( $count < $MINFOLDERCOUNT and $lastwasmax ) ) { my @LockedFiles = () ; # Set timestamp to MAXRETRYTIMER millisecond earlier my $timestamp = &gettimeofday()*1000 - $MAXRETRYTIMER ; # Spawn command to get files ordered by age from afpspool my $Command = "ls -t1r " . $AFPSPOOL ; &Debug("Starting '$Command' to get file list in spool"); open(CMD, "$Command 2>/dev/null |") or &Alert("Can't start \"$Command\" command"), return ; foreach $file () { chomp($file); &Debug("Checking '$file' is for us"); # Skip our LOCKID locking file next if ( $file =~ /^$LOCKID\.LCK$/ ); # Remove the path from file $file =~ s/$AFPSPOOL\/*// ; # Keep file only if its prefix match one from the list my $prefix = &prefix($file) ; next unless ($prefix); # Keep only not locked files and remove it if in list as # it is just locked by another thread if ( $file =~ /^(.*).LCK$/i ) { $file = $1 ; push @LockedFiles , $file ; @{$self->{AfpFiles}} = grep { ! /^$file$/ } @{$self->{AfpFiles}} ; } # Skip if file is still in our lists next if ( defined($self->{AfpName}->{$file}) and $self->{AfpName}->{$file} ); # Same computation as in afpds2tex module my ( $basename ) = $file =~ /^$prefix\.(.*)$/ ; ( $AfpName ) = $basename =~ $AFPNAME_REGEX ; unless (defined($AfpName) and $AfpName) { &Error("Can't set an afpname with '$file', skipping"); next ; } $self->{AfpName}->{$file} = $AfpName ; push @{$self->{AfpFiles}} , $file ; $self->{Timestamp}->{$file} = $timestamp ; &Debug("'$file' added to listing as '$AfpName' Jobname"); last if ( ++$count >= $MAXFOLDERCOUNT ); } $lastwasmax = $count >= $MAXFOLDERCOUNT ? 1 : 0 ; close(CMD); } # Check files availability if (scalar( @{$self->{AfpFiles}} )) { my $found = 0 ; my @keep = () ; # Loop to try first file from list while (scalar( @{$self->{AfpFiles}} )) { $file = shift(@{$self->{AfpFiles}}); $AfpName = $self->{AfpName}->{$file} ; my $file_time = $self->{Timestamp}->{$file} ; my $can_try = &gettimeofday()*1000 - $file_time ; # Select available file by trying to lock it if ( $can_try >= $MAXRETRYTIMER and $self->get_locked(\$file)) { # file has been updated by &get_locked &Debug("Found '$file' for Job $Job[0]"); $found ++ ; last ; } elsif ( $can_try >= $MAXRETRYTIMER ) { # Set file tried and will only retry later $self->{Timestamp}->{$file} += $can_try ; } if ( -e $file ) { push @keep, $file ; } else { delete $self->{Timestamp}->{$file} ; # Release also the AfpName delete $self->{AfpName}->{$file} ; } } # Store kept files if ( @keep ) { &Debug("Files to retry later: @keep"); unshift @{$self->{AfpFiles}}, @keep ; } if ($found) { # Keep internal stats &UPSTAT('SPOOL_GOT_FILE'); # Initialize jobstatus with AfpName to keep restarts in mind &a2pjobstate( $AfpName , 1 , 'o', { AFP => $AfpName, JOBID => $Job[0], STATUS => 'STARTING', FILE => $file, INFOS => 'Started' } ) or &Info("Can't initialize '$Job[0]' job status"); # We must remove now unused keys retrieving the first key map { # Release this AfpName delete $self->{AfpName}->{$_} ; # Release cache timestamp delete $self->{Timestamp}->{$_} ; } grep { $self->{AfpName}->{$_} eq $AfpName } keys(%{$self->{AfpName}}); last ; } else { &Debug("No available file found for Job $Job[0]"); $file = NOMOREFILE ; # Keep internal stats &UPSTAT('SPOOL_GOT_NOFILE'); # Check available timestamp my $time_count = keys(%{$self->{Timestamp}}) ; unless ( $time_count == @{$self->{AfpFiles}}) { my $diff = $time_count - @{$self->{AfpFiles}} ; &Debug("Found diff between timestamp keys " . "and available files: $diff", "Keys = " . join(' ',keys(%{$self->{Timestamp}})), "Files = @{$self->{AfpFiles}}"); &Info("Got available files cache incoherence"); $self->{AfpFiles} = [] ; $self->{Timestamp} = {} ; $self->{'AfpName'} = {} ; } } } else { &Debug("No file found for Job $Job[0]"); $file = NOMOREFILE ; # Keep internal stats &UPSTAT('SPOOL_GOT_NOFILE'); } } # Until file set or state is nomorefile $self->Return( $Job[0] , $file ); $self->AnswerDone( $Job[0] ); # Keep last information to check it before quitting ( $self->{LASTJOB} , $self->{LASTFILE} ) = ( $Job[0] , $file ); } else { $self->ThisError("Can't do unsupported request '$$ref'"); # Keep internal stats &UPSTAT('SPOOL_CHECK_BADREQFORMAT'); } return $file ; } sub openLockingPrefix { my $self = shift ; # First lock the locking folder my $lock = $AFPSPOOL . '/.a2p/.LCK' ; my $LCK ; sysopen( $LCK, $lock , O_WRONLY | O_CREAT ) or return &Error("Can't open lock in shared folder in spool: $!"); return &Error("Can't get lock on shared folder in spool: $!") unless (&WaitFcntlLocked($LCK)); # Create prefix locking files when required with uppercase filename foreach my $prefix ( map { uc } (@_? @_ : &update_prefix($SPOOL_PREFIX)) ) { my $file = $AFPSPOOL . '/.a2p/' . $prefix . '.LCK' ; # Don't try to reopen file next if (-e $file and defined($self->{LockingPrefix}->{$prefix})); sysopen( $self->{LockingPrefix}->{$prefix} , $file , O_WRONLY|O_CREAT ) or &Error("Can't open prefix lock in shared folder: $!"); $self->{LockFileName}->{$prefix} = $file ; } # Release lock on locking prefix folder return &Error("Can't release lock on shared folder in spool: $!") unless (&FcntlUnlocked($LCK)); close($LCK) or &Warn("Found problem when closing locking file in spool: $!"); 1 ; } sub closeLockingPrefix { my $self = shift ; # First lock the locking folder my $lock = $AFPSPOOL . '/.a2p/.LCK' ; my $LCK ; sysopen( $LCK, $lock , O_WRONLY | O_CREAT ) or return &Error("Can't open lock in shared folder in spool: $!"); return &Error("Can't get lock on shared folder in spool: $!") unless (&WaitFcntlLocked($LCK)); foreach my $prefix ( keys(%{$self->{LockingPrefix}}) ) { &FcntlUnlocked($self->{LockingPrefix}->{$prefix}); close($self->{LockingPrefix}->{$prefix}); delete $self->{LockingPrefix}->{$prefix}; unlink $self->{LockFileName}->{$prefix}; } # Release lock on locking prefix folder return &Error("Can't release lock on shared folder in spool: $!") unless (&FcntlUnlocked($LCK)); close($LCK) or &Warn("Found problem when closing locking file in spool: $!"); 1 ; } sub lockprefix { &TIMESTAT('LOCKING-PREFIX'); my $self = shift ; my $prefix = uc(shift) ; sub locktest ; *locktest = $WAIT_ON_PREFIX ? \&WaitFcntlLocked : \&FcntlLocked ; &UPSTAT('GET-LOCKING-PREFIX'); my ( $try, $locked ) = ( 3, 0 ) ; while ( $try -- ) { &openLockingPrefix($self, $prefix) unless ( -e $self->{LockFileName}->{$prefix} ); unless (defined($self->{LockingPrefix}->{$prefix})) { &openLockingPrefix($self, $prefix); &UPSTAT('BAD-PREFIX-TO-LOCK'); next ; } if (&locktest($self->{LockingPrefix}->{$prefix})) { &UPSTAT('GOOD-LOCKING-PREFIX'); $locked ++ ; last ; } &UPSTAT('BAD-LOCKING-PREFIX'); } &TIMESTAT('LOCKING-PREFIX'); return $locked ; } sub unlockprefix { &TIMESTAT('UNLOCKING-PREFIX'); my $self = shift ; my $prefix = uc(shift) ; my $ret = 1 ; &UPSTAT('GET-PREFIX-UNLOCKING'); unless (defined($self->{LockingPrefix}->{$prefix})) { &Warn("Can't unlock not defined filehandle"); &UPSTAT('BAD-PREFIX-TO-UNLOCK'); $ret -- ; } if ($ret and &FcntlUnlocked($self->{LockingPrefix}->{$prefix})) { &UPSTAT('GOOD-PREFIX-UNLOCKING'); } else { &Warn("Can't unlock prefix '$prefix': $!"); &UPSTAT('BAD-PREFIX-UNLOCKING'); $ret -- ; } &TIMESTAT('UNLOCKING-PREFIX'); return $ret ; } sub ThreadInit { my $self = shift ; $self->{AfpFiles} = [] ; $self->{Timestamp} = {} ; $self->{AfpName} = {} ; $self->{LockingPrefix} = {} unless (defined($self->{LockingPrefix})); $self->{LockFileName} = {} unless (defined($self->{LockFileName})); $self->closeLockingPrefix(); $self->openLockingPrefix(); } sub DoBeforeQuit { my $self = shift ; # Close any prefix and unlock the spool if (defined($self->{LOCKEDSPOOL})) { &closeLockingPrefix(); close($self->{LOCKEDSPOOL}); } map { # Get PID locking this spool, it should be me open PID , $self->{LockedSpool}->{$_} . ".LCK" or delete $self->{LockedSpool}->{$_} ; my ( $pid ) = ; close PID ; if ( $self->{LockedSpool}->{$_} and $$ =~ /^$pid$/ ) { unlink $self->{LockedSpool}->{$_} . ".LCK" ; delete $self->{LockedSpool}->{$_} ; } } keys(%{$self->{LockedSpool}}) ; # Check if last response was a file to process to not forget any job if ( defined($self->{LASTJOB}) and defined($self->{LASTFILE}) ) { if ( $self->{LASTFILE} != NOMOREFILE ) { &Alert("Last request was an AFP job to do, you should check " . $self->{LASTJOB} . " job has been processed with '" . $self->{LASTFILE} . "' AFP file"); } } } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;