source: A2P/a2p/A2P/SpoolManager.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 18.3 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: SpoolManager.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22
23package A2P::SpoolManager;
24
25# Derived class from Thread.pm
26use base qw(A2P::Thread);
27
28use strict;
29use Fcntl;
30use Fcntl          qw( :DEFAULT );
31use Time::HiRes    qw( usleep gettimeofday );
32use A2P::Globals;
33use A2P::Syslog;
34use A2P::Tools   qw( FcntlLocked WaitFcntlLocked FcntlUnlocked);
35use A2P::Com     qw( IsCom comJOB );
36use A2P::JobStatus 'a2pjobstate' ;
37
38BEGIN {
39    our $VERSION = sprintf "%s", q$Rev: 415 $ =~ /(\d[0-9.]+)\s+/ ;
40}
41our $VERSION ;
42
43#+--------------------------------------------------------------------------+
44#|             Private helpers for file locking                             |
45#+--------------------------------------------------------------------------+
46sub get_locked (\$) {
47    &TIMESTAT('LOCKING-FILE');
48    my $self = shift ;
49    my $ref  = shift ;
50    my $ret  = 1 ;
51    my $LCK ;
52    my $prefix   = &prefix($$ref);
53    my $filename = $AFPSPOOL . '/' . $$ref ;
54    $filename =~ s$//$/$g ; # Clean filename
55
56    # First lock on prefix
57    unless ($self->lockprefix($prefix)) {
58        &Debug("prefix '$prefix' not locked");
59        &TIMESTAT('LOCKING-FILE');
60        return 0 ;
61    }
62
63    unless ( -e $filename ) {
64        &Debug("'$filename' not found");
65        $self->unlockprefix($prefix);
66        &TIMESTAT('LOCKING-FILE');
67        return 0 ;
68    }
69
70    my $lockfile = $AFPSPOOL . "/.a2p/" . $$ref . ".LCK" ;
71    sysopen( $LCK , "$lockfile" , O_WRONLY | O_CREAT )
72        or $ret = &Error("Can't open '$lockfile' for writing: $!");
73
74     if ( $ret ) {
75        &Debug("Trying to get a lock on '$lockfile'");
76        if ( $ret and &FcntlLocked($LCK)) {
77            truncate($LCK,0);
78
79            my $newname = $filename ;
80
81            # Replace "prefix." string by LOCKID folder name
82            $newname =~ s|/$prefix\.|/$LOCKID/| ;
83            &Debug("Renaming '$filename' to '$newname'");
84            unless (rename $filename , $newname) {
85                &Debug("Can't rename '$$ref': $!");
86                $ret = 0 ;
87            }
88
89            if ( $ret and -e $newname ) {
90                $newname =~ s,$AFPSPOOL/*,, ; # Keep only filename
91                $$ref = $newname ;
92                &Debug("'$newname' locked");
93
94            } else {
95                &Debug("'$newname' not available");
96                $ret = 0 ;
97            }
98
99        } else {
100            &Debug("Can't lock '$lockfile': $!");
101            $ret = 0 ;
102        }
103
104        # Remove lock file
105        close($LCK);
106        unlink $lockfile ;
107    }
108
109    unless ($self->unlockprefix($prefix)) {
110        &Warn("Can't unlock prefix on $prefix before locking on $filename: $!");
111        &Info("$prefix file must be deleted manually " .
112            "after all concerned services has been stopped");
113    }
114
115    &TIMESTAT('LOCKING-FILE');
116    return $ret ;
117}
118
119sub lockname {
120    my $self = shift ;
121
122    my $LCK ;
123    my $lock = $AFPSPOOL . '/' . $_[0] ;
124
125    &Debug("'$_[0]' seems still locked") if ( -s $lock . '.LCK' );
126
127    sysopen( $LCK , $lock . ".LCK" , O_WRONLY | O_CREAT )
128        or &Error("Can't open '$lock.LCK' for writing: $!"), return "";
129
130    if(!&FcntlLocked($LCK)) {
131        &Error("Can't lock '$lock.LCK': $!");
132        return "";
133    }
134
135    truncate $LCK , 0 ;
136
137    # Print our PID in Spool Lock File
138    print $LCK $$ ;
139
140    $self->{LOCKEDSPOOL} = $LCK ;
141
142    # Create folder where will be moved files if needed
143    if ( ! -d $lock ) {
144        mkdir $lock , 0775
145            or &Error("Can't create '$lock' folder: $!"), return "";
146    }
147
148    return $lock ;
149}
150
151my @prefix = () ;
152sub prefix {
153    my $file = shift ;
154    map { return $1 if $file =~ /^($_)\..+/i } @prefix ;
155    return "" ;
156}
157
158sub update_prefix {
159    # Update prefix array as optimization and SPOOL_PREFIX can has been updated
160    return @prefix = split( /\s+/, $_[0] ) ;
161}
162
163################################################################################
164##          Afpspool file handling code                                       ##
165################################################################################
166sub Do {
167    my $self = shift ;
168    my $ref  = shift ;
169
170    my ( $count , $lastwasmax ) = ( 0 , 1 );
171
172    my $file = "" ;
173    my $AfpName = "" ;
174
175    my @Job = &IsCom( comJOB , $$ref );
176
177    # Default request is give me a filename for that job
178    if ( @Job ) {
179
180        if ( $Job[1] == DONE ) {
181            # Reset last kept information to skip annoying alert before quitting
182            # The JobManager is saying it is managing the file, so it will keep
183            # trace of it even if it should quit or abort
184            ( $self->{LASTJOB} , $self->{LASTFILE} ) = ( $Job[0] , NOMOREFILE );
185            $self->AnswerDone( $Job[0] );
186            return 1 ;
187        }
188
189        # Keep internal stats
190        &UPSTAT('SPOOL_CHECK');
191
192        # Locking AFPSPOOL for our LOCKID
193        if (!defined($self->{LockedSpool}->{$AFPSPOOL})) {
194
195            # Unlock old spool, needed if AFPSPOOL is updated
196            $self->DoBeforeQuit();
197
198            # LOCKID will also be a folder in AFPSPOOL, see &get_locked
199            $self->{LockedSpool}->{$AFPSPOOL} = $self->lockname("$LOCKID") ;
200            if ( $self->{LockedSpool}->{$AFPSPOOL} ) {
201                &Debug("'$AFPSPOOL' locked with '$LOCKID' lockid");
202
203            } else {
204                $self->ThisError("Can't lock '$AFPSPOOL' folder on '$LOCKID'");
205                &Alert("Quitting as AFPSPOOL is not available," .
206                    " maybe LOCKID is still allocated to another service");
207
208                # Tell there's nothing we can do
209                $self->Return( $Job[0] , 1 );
210
211                delete $self->{LockedSpool}->{$AFPSPOOL};
212                $MUSTQUIT ++ ; # Fatal error thread should quit
213            }
214        }
215
216        # Update prefix array as optimization and needed when SPOOL_PREFIX
217        # has been updated
218        &update_prefix($SPOOL_PREFIX);
219
220        &Debug("Finding a file for job $Job[0]");
221
222        while (length($file) == 0) {
223            $count = scalar( @{$self->{AfpFiles}} );
224
225            # Update the cache if needed
226            if ( $count == 0 or ( $count < $MINFOLDERCOUNT and $lastwasmax ) ) {
227                my @LockedFiles = () ;
228                # Set timestamp to MAXRETRYTIMER millisecond earlier
229                my $timestamp   = &gettimeofday()*1000 - $MAXRETRYTIMER ;
230
231                # Spawn command to get files ordered by age from afpspool
232                my $Command = "ls -t1r " . $AFPSPOOL ;
233                &Debug("Starting '$Command' to get file list in spool");
234                open(CMD, "$Command 2>/dev/null |")
235                    or &Alert("Can't start \"$Command\" command"), return ;
236
237                foreach $file (<CMD>) {
238                    chomp($file);
239                    &Debug("Checking '$file' is for us");
240
241                    # Skip our LOCKID locking file
242                    next if ( $file =~ /^$LOCKID\.LCK$/ );
243
244                    # Remove the path from file
245                    $file =~ s/$AFPSPOOL\/*// ;
246
247                    # Keep file only if its prefix match one from the list
248                    my $prefix = &prefix($file) ;
249                    next unless ($prefix);
250
251                    # Keep only not locked files and remove it if in list as
252                    # it is just locked by another thread
253                    if ( $file =~ /^(.*).LCK$/i ) {
254                        $file = $1 ;
255                        push @LockedFiles , $file ;
256                        @{$self->{AfpFiles}} = grep { ! /^$file$/ }
257                            @{$self->{AfpFiles}} ;
258                    }
259
260                    # Skip if file is still in our lists
261                    next if ( defined($self->{AfpName}->{$file}) and
262                        $self->{AfpName}->{$file} );
263
264                    # Same computation as in afpds2tex module
265                    my ( $basename ) = $file =~ /^$prefix\.(.*)$/ ;
266                    ( $AfpName ) = $basename =~ $AFPNAME_REGEX ;
267
268                    unless (defined($AfpName) and $AfpName) {
269                        &Error("Can't set an afpname with '$file', skipping");
270                        next ;
271                    }
272
273                    $self->{AfpName}->{$file} = $AfpName ;
274                    push @{$self->{AfpFiles}} , $file ;
275                    $self->{Timestamp}->{$file} = $timestamp ;
276                    &Debug("'$file' added to listing as '$AfpName' Jobname");
277
278                    last if ( ++$count >= $MAXFOLDERCOUNT );
279                }
280                $lastwasmax = $count >= $MAXFOLDERCOUNT ? 1 : 0 ;
281
282                close(CMD);
283            }
284
285            # Check files availability
286            if (scalar( @{$self->{AfpFiles}} )) {
287                my $found = 0 ;
288                my @keep = () ;
289
290                # Loop to try first file from list
291                while (scalar( @{$self->{AfpFiles}} )) {
292                    $file = shift(@{$self->{AfpFiles}});
293                    $AfpName = $self->{AfpName}->{$file} ;
294
295                    my $file_time = $self->{Timestamp}->{$file} ;
296                    my $can_try = &gettimeofday()*1000 - $file_time ;
297                    # Select available file by trying to lock it
298                    if ( $can_try >= $MAXRETRYTIMER
299                    and $self->get_locked(\$file)) {
300                        # file has been updated by &get_locked
301                        &Debug("Found '$file' for Job $Job[0]");
302                        $found ++ ;
303                        last ;
304
305                    } elsif ( $can_try >= $MAXRETRYTIMER ) {
306                        # Set file tried and will only retry later
307                        $self->{Timestamp}->{$file} += $can_try ;
308                    }
309
310                    if ( -e $file ) {
311                        push @keep, $file ;
312
313                    } else {
314                        delete $self->{Timestamp}->{$file} ;
315                        # Release also the AfpName
316                        delete $self->{AfpName}->{$file} ;
317                    }
318                }
319
320                # Store kept files
321                if ( @keep ) {
322                    &Debug("Files to retry later: @keep");
323                    unshift @{$self->{AfpFiles}}, @keep ;
324                }
325
326                if ($found) {
327                    # Keep internal stats
328                    &UPSTAT('SPOOL_GOT_FILE');
329
330                    # Initialize jobstatus with AfpName to keep restarts in mind
331                    &a2pjobstate( $AfpName , 1 , 'o',
332                        {
333                            AFP    => $AfpName,
334                            JOBID  => $Job[0],
335                            STATUS => 'STARTING',
336                            FILE   => $file,
337                            INFOS  => 'Started'
338                        } )
339                        or &Info("Can't initialize '$Job[0]' job status");
340
341                    # We must remove now unused keys retrieving the first key
342                    map {
343                        # Release this AfpName
344                        delete $self->{AfpName}->{$_} ;
345
346                        # Release cache timestamp
347                        delete $self->{Timestamp}->{$_} ;
348
349                    } grep { $self->{AfpName}->{$_} eq $AfpName }
350                        keys(%{$self->{AfpName}});
351
352                    last ;
353
354                } else {
355                    &Debug("No available file found for Job $Job[0]");
356                    $file = NOMOREFILE ;
357
358                    # Keep internal stats
359                    &UPSTAT('SPOOL_GOT_NOFILE');
360
361                    # Check available timestamp
362                    my $time_count = keys(%{$self->{Timestamp}}) ;
363                    unless ( $time_count == @{$self->{AfpFiles}}) {
364                        my $diff = $time_count - @{$self->{AfpFiles}} ;
365                        &Debug("Found diff between timestamp keys " .
366                          "and available files: $diff", "Keys  = " .
367                          join(' ',keys(%{$self->{Timestamp}})),
368                          "Files = @{$self->{AfpFiles}}");
369                        &Info("Got available files cache incoherence");
370                        $self->{AfpFiles}  = [] ;
371                        $self->{Timestamp} = {} ;
372                        $self->{'AfpName'} = {} ;
373                    }
374                }
375
376            } else {
377                &Debug("No file found for Job $Job[0]");
378                $file = NOMOREFILE ;
379
380                # Keep internal stats
381                &UPSTAT('SPOOL_GOT_NOFILE');
382            }
383
384        } # Until file set or state is nomorefile
385        $self->Return( $Job[0] , $file );
386        $self->AnswerDone( $Job[0] );
387
388        # Keep last information to check it before quitting
389        ( $self->{LASTJOB} , $self->{LASTFILE} ) = ( $Job[0] , $file );
390
391    } else {
392        $self->ThisError("Can't do unsupported request '$$ref'");
393
394        # Keep internal stats
395        &UPSTAT('SPOOL_CHECK_BADREQFORMAT');
396    }
397
398    return $file ;
399}
400
401sub openLockingPrefix {
402    my $self = shift ;
403
404    # First lock the locking folder
405    my $lock = $AFPSPOOL . '/.a2p/.LCK' ;
406    my $LCK ;
407    sysopen( $LCK, $lock , O_WRONLY | O_CREAT )
408        or return &Error("Can't open lock in shared folder in spool: $!");
409
410    return &Error("Can't get lock on shared folder in spool: $!")
411        unless (&WaitFcntlLocked($LCK));
412
413    # Create prefix locking files when required with uppercase filename
414    foreach my $prefix ( map { uc } (@_? @_ : &update_prefix($SPOOL_PREFIX)) ) {
415        my $file = $AFPSPOOL . '/.a2p/' . $prefix . '.LCK' ;
416        # Don't try to reopen file
417        next if (-e $file and defined($self->{LockingPrefix}->{$prefix}));
418        sysopen( $self->{LockingPrefix}->{$prefix} , $file , O_WRONLY|O_CREAT )
419            or &Error("Can't open prefix lock in shared folder: $!");
420        $self->{LockFileName}->{$prefix} = $file ;
421    }
422
423    # Release lock on locking prefix folder
424    return &Error("Can't release lock on shared folder in spool: $!")
425        unless (&FcntlUnlocked($LCK));
426
427    close($LCK)
428        or &Warn("Found problem when closing locking file in spool: $!");
429
430    1 ;
431}
432
433sub closeLockingPrefix {
434    my $self = shift ;
435
436    # First lock the locking folder
437    my $lock = $AFPSPOOL . '/.a2p/.LCK' ;
438    my $LCK ;
439    sysopen( $LCK, $lock , O_WRONLY | O_CREAT )
440        or return &Error("Can't open lock in shared folder in spool: $!");
441
442    return &Error("Can't get lock on shared folder in spool: $!")
443        unless (&WaitFcntlLocked($LCK));
444
445    foreach my $prefix ( keys(%{$self->{LockingPrefix}}) ) {
446        &FcntlUnlocked($self->{LockingPrefix}->{$prefix});
447        close($self->{LockingPrefix}->{$prefix});
448        delete $self->{LockingPrefix}->{$prefix};
449        unlink $self->{LockFileName}->{$prefix};
450    }
451
452    # Release lock on locking prefix folder
453    return &Error("Can't release lock on shared folder in spool: $!")
454        unless (&FcntlUnlocked($LCK));
455
456    close($LCK)
457        or &Warn("Found problem when closing locking file in spool: $!");
458
459    1 ;
460}
461
462sub lockprefix {
463    &TIMESTAT('LOCKING-PREFIX');
464    my $self   = shift ;
465    my $prefix = uc(shift) ;
466
467    sub locktest ;
468    *locktest = $WAIT_ON_PREFIX ? \&WaitFcntlLocked : \&FcntlLocked ;
469
470    &UPSTAT('GET-LOCKING-PREFIX');
471    my ( $try, $locked ) = ( 3, 0 ) ;
472    while ( $try -- ) {
473
474        &openLockingPrefix($self, $prefix)
475            unless ( -e $self->{LockFileName}->{$prefix} );
476
477        unless (defined($self->{LockingPrefix}->{$prefix})) {
478            &openLockingPrefix($self, $prefix);
479            &UPSTAT('BAD-PREFIX-TO-LOCK');
480            next ;
481        }
482
483        if (&locktest($self->{LockingPrefix}->{$prefix})) {
484            &UPSTAT('GOOD-LOCKING-PREFIX');
485            $locked ++ ;
486            last ;
487        }
488
489        &UPSTAT('BAD-LOCKING-PREFIX');
490    }
491
492    &TIMESTAT('LOCKING-PREFIX');
493    return $locked ;
494}
495
496sub unlockprefix {
497    &TIMESTAT('UNLOCKING-PREFIX');
498    my $self   = shift ;
499    my $prefix = uc(shift) ;
500    my $ret    = 1 ;
501    &UPSTAT('GET-PREFIX-UNLOCKING');
502
503    unless (defined($self->{LockingPrefix}->{$prefix})) {
504        &Warn("Can't unlock not defined filehandle");
505        &UPSTAT('BAD-PREFIX-TO-UNLOCK');
506        $ret -- ;
507    }
508
509    if ($ret and &FcntlUnlocked($self->{LockingPrefix}->{$prefix})) {
510        &UPSTAT('GOOD-PREFIX-UNLOCKING');
511    } else {
512        &Warn("Can't unlock prefix '$prefix': $!");
513        &UPSTAT('BAD-PREFIX-UNLOCKING');
514        $ret -- ;
515    }
516
517    &TIMESTAT('UNLOCKING-PREFIX');
518    return $ret ;
519}
520
521sub ThreadInit {
522    my $self = shift ;
523    $self->{AfpFiles}  = [] ;
524    $self->{Timestamp} = {} ;
525    $self->{AfpName}   = {} ;
526
527    $self->{LockingPrefix} = {} unless (defined($self->{LockingPrefix}));
528    $self->{LockFileName}  = {} unless (defined($self->{LockFileName}));
529
530    $self->closeLockingPrefix();
531    $self->openLockingPrefix();
532}
533
534sub DoBeforeQuit {
535    my $self = shift ;
536
537    # Close any prefix and unlock the spool
538    if (defined($self->{LOCKEDSPOOL})) {
539        &closeLockingPrefix();
540        close($self->{LOCKEDSPOOL});
541    }
542
543    map {
544        # Get PID locking this spool, it should be me
545        open PID , $self->{LockedSpool}->{$_} . ".LCK"
546            or delete $self->{LockedSpool}->{$_} ;
547        my ( $pid ) = <PID> ;
548        close PID ;
549        if ( $self->{LockedSpool}->{$_} and $$ =~ /^$pid$/ ) {
550            unlink $self->{LockedSpool}->{$_} . ".LCK" ;
551            delete $self->{LockedSpool}->{$_} ;
552        }
553    } keys(%{$self->{LockedSpool}}) ;
554
555    # Check if last response was a file to process to not forget any job
556    if ( defined($self->{LASTJOB}) and defined($self->{LASTFILE}) ) {
557        if ( $self->{LASTFILE} != NOMOREFILE ) {
558            &Alert("Last request was an AFP job to do, you should check " .
559                $self->{LASTJOB} . " job has been processed with '" .
560                $self->{LASTFILE} . "' AFP file");
561        }
562    }
563}
564
565&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
566
5671;
Note: See TracBrowser for help on using the repository browser.