source: A2P/a2p/A2P/JobManager.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 43.4 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: JobManager.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22
23package A2P::JobManager;
24
25# Derived class from Thread.pm
26use base qw(A2P::Thread);
27
28use strict ;
29use integer ;
30use Time::HiRes qw( gettimeofday tv_interval );
31use A2P::Globals ;
32use A2P::Syslog ;
33use A2P::Signal ;
34use A2P::Job ;
35use A2P::Thread ;
36use A2P::Com qw( IsCom GetCom comJOB comASK comSAY comDONE comINF
37               comREQ );
38
39BEGIN {
40    our $VERSION = sprintf "%s", q$Rev: 1399 $ =~ /(\d[0-9.]+)\s+/ ;
41}
42our $VERSION ;
43
44# Jobs management vars and subs
45my @Jobs ; # List of jobs waiting for an action
46my @SlowJobs ; # List of jobs waiting slowly for an action
47my %Jobs ; # List of current jobs objects, referenced by their current ID
48my %Cancels ; # List of cancelled jobs
49
50sub is_job {
51    return defined($_[0]) ?
52        ( ref($Jobs{$_[0]}) =~ /^A2P::Job$/ ? 1 : 0 ) : 0 ;
53}
54
55sub JobCount {
56    return grep {
57        defined($Jobs{$_}) and $Jobs{$_}->geta2pjob eq ""
58            and $Jobs{$_}->is_not_abterm
59            # Don't count splitted jobs as they are slower to handle
60            and ! $Jobs{$_}->is_splitted
61            and $Jobs{$_}->getstep < $CLEAN_OPTIMIZATION ? 11 : 13
62            and ! exists($Cancels{$_})
63        } keys(%Jobs);
64}
65
66sub ActiveJobCount {
67    return grep {
68        defined($Jobs{$_}) and $Jobs{$_}->geta2pjob eq "" and
69            $Jobs{$_}->is_not_abterm
70        } keys(%Jobs);
71}
72
73sub delete_job {
74    my $jid = shift ;
75    return unless (&is_job($jid));
76
77    # Check if we are deleting an AFP job
78    unless ( $Jobs{$jid}->geta2pjob ) {
79        # Empty waiting jobs list from the Jids
80        @Jobs = grep { $_ !~ /^$jid(-\d+){0,1}$/ } @Jobs ;
81        @SlowJobs = grep { $_->[0] !~ /^$jid(-\d+){0,1}$/ } @SlowJobs ;
82
83        # Delete clone objects before afpjob
84        map { delete $Jobs{$_} }
85            ( $Jobs{$jid}->getclones, $Jobs{$jid}->getfinishedclones );
86    }
87
88    delete $Jobs{$jid} ;
89}
90
91sub EnqueueJob {
92    my $jid = shift ;
93    return unless ( &is_job($jid) and ! grep { $_ eq $jid } @Jobs );
94    push @Jobs , $jid ;
95}
96
97sub SlowEnqueueJob {
98    my $jid = shift ;
99    return unless ( &is_job($jid) and ! grep { $_->[0] eq $jid } @SlowJobs );
100    my $factor = shift || 1 ;
101    &UPSTAT('SLOW-ENQUEUED');
102    $factor = 10 unless ( $factor < 10 );
103    &MAXSTAT('SLOW-QUEUE-FACTOR',$factor);
104    push @SlowJobs, [ $jid, [ &gettimeofday() ], $factor ] ;
105}
106
107################################################################################
108##             Job Manager code                                               ##
109################################################################################
110sub Do {
111    my $self = shift ;
112    my $ref  = shift ;
113
114    my $Ret = 0 ;
115
116    my @Job = &IsCom( comJOB , $$ref );
117
118    if ( @Job == 2 ) {
119        my ( $Jid , $Answer ) = @Job ;
120
121        &Debug("Received request for job '$Jid'");
122
123        if (defined($Jobs{$Jid})) {
124
125            if ( &IsCom( comINF , $Answer ) ) {
126                &UPSTAT('GET-COMINF');
127                my @infos = &IsCom( comINF , $Answer ) ;
128                my $info = join(": ",@infos);
129
130                # This information should return to client immediatly
131                if ( $Jobs{$Jid}->getClientRef() ) {
132                    # This is only some information during job processing
133                    $Jobs{$Jid}->jobdebug("Saying '$info' to " .
134                        $Jobs{$Jid}->getClientRef());
135                    $self->Request( Listener => &GetCom( comSAY ,
136                        $Jobs{$Jid}->getClientRef() => $info ) );
137
138                } elsif ( $infos[0] =~ /^ABTERM\s(\d+)$/i ) {
139                    $Jobs{$Jid}->error( $1, "ABTERM: " . $infos[1] );
140
141                } else {
142                    $Jobs{$Jid}->jobinfo($info);
143                }
144
145            } else {
146                # This is an expected answer
147                if ( $Answer =~ /^\d+$/ ) { # is_numeric
148                    $Jobs{$Jid}->setstate($Answer);
149
150                } else {
151                    $Jobs{$Jid}->setanswer($Answer);
152                }
153
154                # Update current pending requests number
155                if ($self->{PENDING_REQUEST}) {
156                    $self->{PENDING_REQUEST} -- ;
157                    &UPSTAT('pending-REQUEST',-1);
158                }
159
160                # Processing for this job
161                $Ret = $self->ManageJobs($Jid);
162
163            }
164            $Ret ++ unless $Ret ;
165
166        } else {
167            &Warn("Don't know anything of $Jid job in '$$ref' request");
168        }
169
170    } elsif ( $$ref =~ m|START|i ) {
171        if ( $SCAN_SPOOL ) {
172            &Debug("Starting to get files from spool");
173            $do_file ++ ;
174        }
175        $Ret = $self->Answer('STARTED');
176
177    # Is this a question ?
178    } elsif ( my @Ask = &IsCom( comASK , $$ref ) ) {
179        my $name = $Ask[0] ;
180        &Debug("Received question '$Ask[1]' from $name");
181
182        if ( @Job = &IsCom( comJOB , $Ask[1] )) {
183            # Prepare default returned value
184            my $thisjob = $Job[0] ;
185
186            # Check if it's a cancel request
187            if ( $Job[1] =~ /^cancel$/i ) {
188                &Debug("Got cancelling request for $thisjob job");
189                return $self->CancelJob( $name, $thisjob );
190            }
191
192            # Finally, create an a2p job and enqueue it in our job list
193            my $job = new A2P::Job( $thisjob , $name );
194
195            if ($job) {
196                $job->setstate(0);
197                $job->setanswer($Job[1]);
198                $job->setstep(0);
199                $job->start_ms
200                    and $job->nextstep("Got file to convert");
201
202                # Starting new job
203                $Jobs{$thisjob} = $job ;
204                $Ret = $self->ManageJobs($thisjob);
205
206            } else {
207                &Debug("Reporting error to $name");
208                $self->Request( Listener => &GetCom( comSAY ,
209                    $name => 'ABORTING RESTART ON RUNNING JOB' ) );
210                $self->Request( Listener => &GetCom( comSAY  ,
211                    $name => &GetCom( comDONE , '' => "JOBSTATUS: KO" ) ));
212
213                $Ret = $thisjob ;
214            }
215
216        } else {
217            $self->ThisError("[DEV] Unsupported '$Ask[1]' question from $name");
218        }
219
220    } else {
221        $self->ThisError("Got bad request '$$ref'");
222    }
223
224
225    return $Ret ;
226}
227
228sub CancelJob {
229    my $self = shift ;
230    my $name = shift ;
231    my $thisjob = shift ;
232
233    if (&is_job($thisjob)) {
234        &Info("trying to cancel $thisjob job as asked by $name");
235        if ( $Cancels{$thisjob} = $Jobs{$thisjob}->cancel ) {
236           $Cancels{$thisjob}->jobinfo("Cancel is pending");
237            $self->Request( Listener => &GetCom( comSAY ,
238                $name => 'CANCEL PENDING' ) );
239            $self->Request( Listener => &GetCom( comSAY  ,
240                $name => &GetCom( comDONE , '', "CANCEL: OK" ) ));
241
242        } else {
243            # Still delete this job if nothing more is expected
244            &delete_job($thisjob);
245            delete $Cancels{$thisjob} ;
246            $self->Request( Listener => &GetCom( comSAY ,
247                $name => 'CANCEL NOTHING OR TOO LATE' ) );
248            $self->Request( Listener => &GetCom( comSAY  ,
249                $name => &GetCom( comDONE , '', "CANCEL: KO" ) ));
250        }
251
252    } elsif (exists($Cancels{$thisjob})) {
253        &Debug("Can't cancel still cancelled $thisjob job");
254        $self->Request( Listener => &GetCom( comSAY ,
255            $name => 'STILL CANCELLING JOB' ) );
256        $self->Request( Listener => &GetCom( comSAY  ,
257            $name => &GetCom( comDONE , '', "CANCEL: OK" ) ));
258
259    } else {
260        &Warn("Can't cancel no more managed $thisjob job");
261        $self->Request( Listener => &GetCom( comSAY ,
262            $name => 'ABORTING CANCEL ON UNAVAILABLE JOB' ) );
263        $self->Request( Listener => &GetCom( comSAY  ,
264            $name => &GetCom( comDONE , '', "CANCEL: KO" ) ));
265    }
266
267    return $thisjob ;
268}
269
270my $lastcheck = [ &gettimeofday() ];
271sub GetRequest {
272    my $self     = shift ;
273    my $Requests = shift ;
274    my $Ret      = undef ;
275    my @JOBS ;
276
277    #######################
278    # Check flags
279    #######################
280
281    {
282        # Generate new job at SPOOL_TIME time
283        no integer ;
284        if ( &tv_interval($lastcheck) > $SPOOL_TIMER ) {
285            $lastcheck = [ &gettimeofday() ];
286            $do_file ++ ;
287        }
288    }
289
290    if ( $do_file and $SCAN_SPOOL ) {
291
292        @JOBS = &JobCount() ;
293        $STATS{'COUNTED-JOBS'} = "@JOBS" ;
294
295        if ( @JOBS < $MAXJOBS ) {
296            # Create an a2p job and enqueue it in our job list
297            my $job = new A2P::Job ;
298
299            # Enqueing new job
300            $Jobs{$job->getid} = $job ;
301            &EnqueueJob( $job->getid );
302
303            # Don't decrease do_file if many jobs are active as this means
304            # we can burst job creation as AFP files should be availables
305            $do_file -- unless ( &JobCount > 1 and &JobCount < $MAXJOBS );
306
307        } else {
308            &Debug("Skip new job creation as there's still enough job running");
309            $do_file -- ;
310        }
311    }
312
313    # Check to do some jobs and com for that jobs
314    $self->{DONTSLEEP} = 0 ;
315    $self->Request();
316    $self->ManageJobs();
317
318    @JOBS = &ActiveJobCount() ;
319    $STATS{'ACTIVES-JOBS'} = "@JOBS" ;
320    &MAXSTAT('ACTIVES-JOBS-COUNT',scalar(@JOBS));
321
322    # Really quit when MAXJOBS has been set to 0 and there's no more job running
323    $self->{DO_QUIT} = 1
324        unless ( $MAXJOBS
325        # Check if pending jobs are not cancelled
326        or ( @JOBS and @JOBS != grep { exists($Cancels{$_}) } @JOBS ));
327
328    return $MUSTQUIT ? 1 : $self->ReadSock($Requests) ;
329}
330
331sub ReturnInfoToClient {
332    # Return if not a client job
333    return unless $Jobs{$_[1]}->getClientRef() ;
334
335    # In case Job is requested by a client, send info to client
336    &Debug("Checking if $_[1] is a client to return to some info");
337    my @info = $Jobs{$_[1]}->getClientInfo ;
338    if ( @info ) {
339        my $Ref = shift @info ;
340        # Array becomes empty while there's no info
341        while ( @info ) {
342            my $info = shift @info ;
343            $Jobs{$_[1]}->jobdebug("Saying '$info' to $Ref");
344            $_[0]->Request( Listener => &GetCom( comSAY , $Ref => $info ) );
345        }
346    }
347
348    # Check if we need to close conversation
349    if ( $Jobs{$_[1]}->isstep(13) or
350            ( $Jobs{$_[1]}->isstep(12) and $Jobs{$_[1]}->getstate == DONE )) {
351        my $Ref = $Jobs{$_[1]}->getClientRef() ;
352        $Jobs{$_[1]}->unsetClientRef() ;
353        $_[0]->Request(
354            Listener => &GetCom( comSAY  ,
355                $Ref => &GetCom( comDONE , '' => "JOBSTATUS: " .
356                    ($Jobs{$_[1]}->error()?'KO':'OK') ) ));
357    }
358}
359
360sub loopdebug {
361    my $self = shift ;
362    &Debug("Current active jobs: " . &ActiveJobCount(),
363        "Current cached requests:",@{$self->{REQUESTS_ARRAY}});
364    $self->commonloopdebug();
365}
366
367sub ManageJobs {
368    my $self = shift ;
369    my $Jid  = shift ;
370
371    ####################################################################
372    # Check if we can get next job from SlowJobs list
373    ####################################################################
374    my $slowfactor = 0 ;
375    if ( @SlowJobs and ! defined($Jid) ) {
376        my $slowref = shift @SlowJobs ;
377        $Jid = $slowref->[0] ;
378        my @timeout = @{$slowref->[1]} ;
379        my $delay   = $slowref->[2]++ ;
380        $delay *= $USLEEP / 1_000_000 ;
381
382        # The rest of the array is a timer
383        if ( &tv_interval(@timeout) < $delay ) {
384            push @SlowJobs, $slowref ;
385            undef $Jid ;
386
387        } else {
388            # Set slowfactor in case we need it with SlowEnqueing
389            $slowfactor = $slowref->[2] ;
390        }
391    }
392
393    ##################################
394    # Get one Job Id if not a new one
395    ##################################
396    $Jid = shift @Jobs unless (defined($Jid));
397
398    return 1 unless $Jid ;
399
400    return &Warn("$Jid is not in my jobs list")
401        unless (&is_job($Jid));
402
403    # Compute a management rate for internal statistics
404    if ($self->{MANAGEJOBS_COUNT} ++
405    and &tv_interval($self->{MANAGEJOBS_TIME}) > 10 ) {
406        my $rate = $self->{MANAGEJOBS_COUNT} /
407            &tv_interval($self->{MANAGEJOBS_TIME}) ;
408        &MAXSTAT('MANAGEJOBS-RATE',sprintf("%.1f",$rate));
409        $self->{MANAGEJOBS_COUNT} = 0 ;
410        $self->{MANAGEJOBS_TIME} = [ &gettimeofday() ] ;
411    }
412
413    # As we have tasks to do, avoid to sleep on main loop
414    $self->{DONTSLEEP} ++
415        unless ($self->{LASTJOB} eq $Jid);
416
417    $self->{LASTJOB} = $Jid ;
418
419    my $Job = $Jobs{$Jid} ;
420
421    $self->ReturnInfoToClient( $Jid ) unless ($Job->isstep(12));
422
423    # Check if job is in an agregation error case to shorten processing
424    goto JOBDONE if ( $Job->badagregation and ! $Job->isstep(10) );
425
426    ############################################################################
427    # Step 0: Job just created, it need a file, so give it to SpoolManager
428    ############################################################################
429    if ( $Job->isstep(0) ) {
430        $Job->jobdebug("Starting Job $Jid with a2p v" . A2P_RPM_VERSION );
431        $Job->jobinfo("Checking one file from AFPSPOOL");
432        $Job->start_ms
433            and $Job->nextstep("Waiting for a filename");
434        $self->Request( SpoolManager => $Job->request );
435
436    ############################################################################
437    # Step 1: A2P Job returned with a filename or with NOMOREFILE
438    ############################################################################
439    } elsif ( $Job->isstep(1) ) {
440        $Job->stop_ms; # Stop chrono
441
442        if ( $Job->getstate == NOMOREFILE ) {
443            $Job->jobinfo("No file to convert");
444            $Job->releaseinfo ;
445            &delete_job($Jid);
446
447            &UPSTAT('NOAFPJOB');
448
449        } elsif ( $Job->getstate == 0 ) {
450            $Job->releaseinfo ;
451
452            # Have a file to convert
453            my $file = $Job->setfile( $Job->getanswer );
454            $Job->start_ms
455                and $Job->nextstep("Converting '$file' to TeX format");
456            $self->Request( Converter => $Job->toconverter );
457
458            # Also say to SpoolManager its job is done and we manage now here
459            # any information about the job and its AFP file
460            $self->Request( SpoolManager => $Job->request( DONE ))
461                if ( $SCAN_SPOOL and ! $Job->is_splitted ) ;
462
463            &UPSTAT('STARTEDAFPJOB');
464
465        } else {
466            $Job->jobalert( $Job->getstate,
467                "ABTERM: Can't get a file from AFPSPOOL");
468            $MUSTQUIT ++ ;
469        }
470
471    ############################################################################
472    # Step 2: A2P Job returned from a converter
473    ############################################################################
474    } elsif ( $Job->isstep(2) ) {
475
476        if ( $Job->getstate ) {
477            # Check if we need to split AFP file before stopping this step
478            my $do_split = $Job->split_cmd ;
479            if (defined($do_split)) {
480                if ($do_split =~ /^\d+$/) {
481                    if ($do_split) {
482                        $Job->error( $do_split, "ABTERM: Splitting error");
483
484                    } else {
485                        # Not ready to split, so process later
486                        &SlowEnqueueJob($Jid,$slowfactor);
487                    }
488
489                } else {
490                    $self->Request( BackEnd => $do_split );
491                }
492
493                # Still return when no error found in splitting
494                return 1 unless $Job->error ;
495            }
496
497            $Job->stop_ms; # Stop chrono
498
499            # Wait for all TeX jobs to be done
500            $Job->a2pjob_progress_init ;
501            $Job->start_ms
502                and $Job->setstep(10);
503
504            # Initializes sequence computation
505            $Job->sequence_ready();
506
507            if ($Job->getstate == DONE) {
508                $Job->jobinfo("Waiting after cloned job");
509
510            } else {
511                # Error
512                $Job->error( $Job->getstate, "ABTERM: Bad conversion")
513                    unless ( $Job->error >= 90 and $Job->error < 100 );
514                $Job->jobinfo("Waiting any cloned job after conversion ABTERM");
515
516                # Update files to clean
517                $Job->setconversionerrorcleaning ;
518            }
519
520            &EnqueueJob($Jid);
521
522        } else {
523            # Have a converted file, create a new job starting at step 3
524            # Or this is just a folder name to add in todelete list
525            # Or also it can be a split event
526            if ( -d $Job->getanswer ) {
527                $Job->setjobdir( $Job->getanswer );
528                return 1 ;
529            }
530
531            # Create a cloned job to continue steps
532            my $job = $Job->newjob() ;
533
534            # We reach a split event if job is undefined
535            unless (defined($job)) {
536
537                # Re-enqueue this job as it should be set DONE now
538                $Job->setstate(DONE);
539                &EnqueueJob($Jid);
540
541                # Get the tag to save this AFP as with Tag
542                my $tag    = $Job->{'SPLIT_TAG'}  ;
543                my $newtag = $Job->{'SPLIT_TAG2'} ;
544
545                # Then create an a2p job and enqueue it in our job list for the
546                # same file but starting at a given position
547                $job = new A2P::Job ;
548                $Jobs{$job->getid} = $job ;
549
550                # Initialize job from splitted job
551                $job->split_from( $Job );
552
553                # Starting new job by enqueing it
554                $job->setstep(0);
555                $job->start_ms
556                    and $job->nextstep("New AFP to convert $newtag jobs" .
557                        " from " . $job->getfilestart .
558                        " pos in original AFP file");
559
560                # Initialize jobstatus with AfpName to keep restarts in mind
561                # as step 1 done as if it was set by SpoolManager
562                my ( $AfpName ) = $Job->getfile =~ $AFPNAME_REGEX ;
563                $job->jobstatus( 'o',
564                    {
565                        AFP    => $AfpName.'.'.$newtag,
566                        JOBID  => $job->getid,
567                        STATUS => 'STARTING',
568                        FILE   => $Job->getfile.'.'.$newtag,
569                        INFOS  => 'Started'
570                    } )
571                    or &Info("Can't initialize '".$job->getid."' status");
572
573                &EnqueueJob($job->getid);
574
575                # Still return after handling the split
576                return 1 ;
577            }
578
579            &UPSTAT('STARTEDJOB');
580
581            $Job->jobdebug("Cloned as job " . $job->getid .
582                " to process " . $job->getfile );
583
584            my $jid = $job->getid ;
585            $Jobs{$jid} = $job ;
586
587            # Update job status now to set the DESTID
588            $job->jobstatus( '.',
589                {
590                    DESTID => $job->getdestid ,
591                    JOBNAME=> $job->getjobname ,
592                    STATUS => "STARTED"
593                })
594                or &Info("Can't set job status at step 3");
595
596            if ($job->do_pdf or $ONLY_DO_PDF) {
597                $job->start_ms
598                    and $job->nextstep("Generating PDF " .
599                        ( $job->getClientRef() ?
600                            "'" . $job->getPdfName() . "'" : "file" ));
601
602
603                $self->Request( BackEnd => $job->pdf_cmd );
604
605            } else {
606                $Job = $job ;
607                $Jid = $jid ;
608                $Job->stepskip("No pdf file required, skipped");
609                goto STEP4 ;
610            }
611        }
612
613    ############################################################################
614    # Step 3: Pdf file done or not needed, check to do DVI
615    ############################################################################
616    } elsif ( $Job->isstep(3) ) {
617        if ($Job->do_pdf or $ONLY_DO_PDF) {
618            # Still rename PDF to its final name
619            $Job->rename_pdf ;
620
621            $Job->stop_ms ; # Stop chrono
622
623            # Check state
624            if ( $Job->getstate != DONE ) {
625                $Job->error( $Job->getstate, "ABTERM: Bad PDF generation");
626                # Any error disable next processing, see A2P::Job->error
627
628                # We must also set an error to diasable any sequence processing
629                $Job->error( 701, "Aborting the printing sequence" );
630
631            } else {
632                $Job->stepdone ;
633            }
634
635            # Finish immedialy this job if needed
636            if ( $ONLY_DO_PDF ) {
637                $Job->setstep(10);
638                goto JOBDONE  ;
639            }
640        }
641STEP4:
642        if ($Job->do_pcl or $Job->do_vpcl or $Job->do_ps) {
643            $Job->start_ms
644                and $Job->nextstep("Converting to DVI format");
645            $self->Request( BackEnd => $Job->dvi_cmd );
646
647        } else {
648            $Job->stepskip("No dvi format required, skipped");
649            # Nothing to print so jump directly to step 10
650            $Job->setstep(9);
651            goto STEP10 ;
652        }
653
654    ############################################################################
655    # Step 4: DVI file done or not needed, check to do PCL
656    ############################################################################
657    } elsif ( $Job->isstep(4) ) {
658        if ($Job->do_pcl or $Job->do_vpcl or $Job->do_ps) {
659            $Job->stop_ms ; # Stop chrono
660
661            # Check state
662            if ( $Job->getstate != DONE) {
663                $Job->error($Job->getstate,"ABTERM: Bad DVI format conversion");
664                # Any error disable next processing, see A2P::Job->error
665
666            } else {
667                $Job->stepdone ;
668            }
669        }
670STEP5:
671        if ($Job->do_pcl) {
672            $Job->start_ms
673                and $Job->nextstep("Generating PCL format");
674            $self->Request( BackEnd => $Job->pcl_cmd );
675
676        } else {
677            $Job->stepskip("No pcl format required, skipped");
678            goto STEP6 ;
679        }
680
681    ############################################################################
682    # Step 5: PCL file done or not needed, check to do VPCL
683    ############################################################################
684    } elsif ( $Job->isstep(5) ) {
685        if ($Job->do_pcl) {
686            $Job->valid_pcl; # Check PCL file
687            $Job->stop_ms ; # Stop chrono
688
689            # Check state
690            if ( $Job->getstate != DONE) {
691                $Job->error($Job->getstate,"ABTERM: Bad PCL format generation");
692                # Any error disable next processing, see A2P::Job->error
693
694            } else {
695                $Job->stepdone ;
696            }
697        }
698STEP6:
699        if ($Job->do_vpcl) {
700            my $vpcl_cmd = $Job->vpcl_cmd ;
701
702            # Only ask to generate vpcl if we need a different pcl than previous
703            if ( $vpcl_cmd ) {
704                $Job->start_ms
705                    and $Job->nextstep("Generating PCL format for validation");
706                $self->Request( BackEnd => $vpcl_cmd );
707
708            } else {
709                $Job->stepskip("Validation PCL still generated " .
710                    "during previous step");
711                goto STEP7 ;
712            }
713
714        } else {
715            $Job->stepskip("No validation pcl file required, skipped");
716            goto STEP7 ;
717        }
718
719    ############################################################################
720    # Step 6: VPCL file done or not needed, check to do PS
721    ############################################################################
722    } elsif ( $Job->isstep(6) ) {
723        if ($Job->do_vpcl) {
724            $Job->valid_vpcl; # Check PCL file
725            $Job->stop_ms ; # Stop chrono
726
727            # Check state
728            if ( $Job->getstate != DONE) {
729                $Job->error( $Job->getstate,
730                    "ABTERM: Bad PCL format for validation");
731                # Any error disable next processing, see A2P::Job->error
732
733            } else {
734                $Job->stepdone ;
735            }
736        }
737STEP7:
738        if ($Job->do_ps and ! $Job->do_pcl) {
739            $Job->start_ms
740                and $Job->nextstep("Generating PS format");
741            $self->Request( BackEnd => $Job->ps_cmd );
742
743        } else {
744            $Job->stepskip("No PS file required, skipped");
745            $Job->jobinfo("DESTID is " . $Job->getdestid ) if ($Job->getdestid);
746            goto STEP8 ;
747        }
748
749        $Job->jobinfo("DESTID is " . $Job->getdestid ) if ($Job->getdestid);
750        &EnqueueJob($Jid);
751
752    ############################################################################
753    # Step 7: PS file done or not needed, check to do PRINT
754    ############################################################################
755    } elsif ( $Job->isstep(7) ) {
756        if ($Job->do_ps and ! $Job->do_pcl and $Job->stepnotdone) {
757            $Job->stop_ms ; # Stop chrono
758
759            # Check state
760            if ( $Job->getstate != DONE) {
761                $Job->error( $Job->getstate, "ABTERM: Bad PS format");
762                # Any error disable next processing, see A2P::Job->error
763
764            } else {
765                $Job->stepdone ;
766            }
767        }
768STEP8:
769        my $later = 0 ;
770        # do_lpr must be in sequence
771        if ($Job->do_lpr(\$later)) {
772            $Job->start_ms
773                and $Job->nextstep( ( $Job->next_in_sequence ?
774                    "Agregating for " : "Printing to " ) . $Job->printer );
775            $self->Request( BackEnd => $Job->lpr_cmd );
776
777        # Just enqueue again this job step if not its turn
778        } elsif ($later) {
779            $Job->jobdebug("Waiting for print sequence");
780            # Set it waiting if the sequence is known as this means this is not
781            # our turn to print
782            if ($Job->sequence_ready()) {
783                return $Job->waiting(1);
784
785            } else {
786                &EnqueueJob($Jid);
787            }
788
789        } else {
790            if ($Job->error) {
791                # Need to check sequence if we pass here after a error in the
792                # middle of a sequence
793                &EnqueueJob( $Job->get_next_authorized )
794                    if ( $Job->remove_from_sequence_but_authorize_next );
795            }
796
797            $Job->stepskip("No print required, skipped");
798            goto STEP9 ;
799        }
800        # Here the job is not waiting
801        $Job->waiting(0);
802
803    ############################################################################
804    # Step 8: Print done or not needed, check to do VPRINT
805    ############################################################################
806    } elsif ( $Job->isstep(8) ) {
807        if ($Job->do_lpr and $Job->stepnotdone) {
808            $Job->stop_ms ; # Stop chrono
809
810            # Print is in the print spool we can authorize next print
811            # in the sequence
812            &EnqueueJob( $Job->get_next_authorized )
813                if ( $Job->authorize_next_print and $Job->next_is_waiting );
814
815            # Check state
816            if ( $Job->getstate != DONE ) {
817                $Job->error( $Job->getstate, "ABTERM: Print Error");
818                # Any error disable next processing, see A2P::Job->error
819
820            } else {
821                $Job->stepdone ;
822            }
823        }
824STEP9:
825        my $later = 0 ;
826        if ($Job->do_vlpr(\$later)) {
827            $Job->start_ms
828                and $Job->nextstep( ( $Job->next_in_sequence ?
829                    "Agregating validation for " : "Printing for validation to "
830                    ) . $Job->vprinter );
831            $self->Request( BackEnd => $Job->vlpr_cmd );
832
833        # Just enqueue again this job step if not its turn
834        } elsif ($later) {
835            $Job->jobdebug("Waiting for validation print sequence");
836            # Set it waiting if the sequence is known as this means this is not
837            # our turn to print
838            if ($Job->sequence_ready()) {
839                return $Job->waiting(1);
840
841            } else {
842                &EnqueueJob($Jid);
843            }
844
845        } else {
846            if ($Job->error) {
847                # Need to check sequence if we pass here after a error in the
848                # middle of a sequence
849                &EnqueueJob( $Job->get_next_authorized )
850                    if ( $Job->remove_from_sequence_but_authorize_next );
851            }
852
853            $Job->stepskip("No validation print required, skipped");
854            goto STEP10 ;
855        }
856        $Job->waiting(0);
857
858    ############################################################################
859    # Step 9: VPrint done or not needed, check to archive
860    ############################################################################
861    } elsif ( $Job->isstep(9) ) {
862
863        if ($Job->do_vlpr) {
864            $Job->stop_ms ; # Stop chrono
865
866            # Print is in the print spool we can authorize next print
867            # in the sequence
868            &EnqueueJob( $Job->get_next_authorized )
869                if ( $Job->authorize_next_print and $Job->next_is_waiting );
870
871            # Check state
872            if ( $Job->getstate != DONE ) {
873                $Job->error( $Job->getstate, "ABTERM: Validation Print error");
874                # Any error disable next processing, see A2P::Job->error
875
876            } else {
877                $Job->stepdone ;
878            }
879        }
880STEP10:
881        if ($Job->do_arch) {
882            $Job->start_ms
883                and $Job->nextstep("Archiving PDF file");
884
885            if ( $Job->get_arch_file ) {
886                $self->Request( Archiver => $Job->get_arch_file );
887
888            } else {
889                $Job->error( 110,
890                    "Need to archive something but not arch base available");
891                goto JOBDONE ;
892            }
893
894        } elsif ($Job->do_eservice) {
895            $Job->start_ms
896                and $Job->nextstep("Transmitting PDF file to '" .
897                    $Job->eservice_name . "'");
898
899            if ( $Job->eservice_request ) {
900                $self->Request( EService => $Job->eservice_request );
901
902            } else {
903                $Job->error( 110,
904                    "Need to transmit something but no content available");
905                goto JOBDONE ;
906            }
907
908        } else {
909            $Job->stepskip("No archiving required, skipped");
910            goto JOBDONE ;
911        }
912
913    ############################################################################
914    # Step 10: Waiting for cloned job to be processed
915    ############################################################################
916    } elsif ( $Job->isstep(10) ) {
917JOBDONE:
918        my $a2pjob = $Job->geta2pjob ;
919
920        #-------------------------------------------------
921        # It's cloned job waited after by its A2P job
922        #-------------------------------------------------
923        if (length($a2pjob)) {
924
925            if ($Job->do_arch) {
926                $Job->stop_ms ; # Stop chrono
927
928                # Check state
929                if ( $Job->getstate != DONE ) {
930                    $Job->error( $Job->getstate, "ABTERM: Archiver error");
931                    # Any error disable next processing, see A2P::Job->error
932
933                } else {
934                    $Job->stepdone ;
935                }
936
937                # Check if we need to pass again to step 10 for E-Service...
938                if ($Job->do_eservice) {
939                    $Job->setstep(9) ;
940                    goto STEP10 ;
941                }
942
943            } elsif ($Job->do_eservice) {
944                $Job->stop_ms ; # Stop chrono
945
946                # Check state
947                if ( $Job->getstate != DONE ) {
948                    $Job->error( $Job->getstate, "ABTERM: '" .
949                        $Job->eservice_name . "' E-Service error");
950                    # Any error disable next processing, see A2P::Job->error
951
952                } else {
953                    $Job->stepdone ;
954                }
955            }
956
957            if (defined($Jobs{$a2pjob})) {
958                # Check error to update a2pjob error state
959                $Jobs{$a2pjob}->error( $Job->error,
960                    "Cloned job $Jid gives error on step " .
961                    $Job->getsteperror . ": " . join(', ',$Job->geterrors))
962                        if ($Job->error);
963
964                # Update parent job even if any other clone produced error
965                $Jobs{$a2pjob}->todelete($Job->get2delete);
966                $Jobs{$a2pjob}->toclean($Job->get2clean);
967                $Jobs{$a2pjob}->tomove($Job->get2move);
968
969            } else {
970                $Job->error( 10 , "AFPFile job object has been released" );
971            }
972
973            # Keep statistics from this job before delete it
974            $Job->setstate( $Job->error ? $Job->error : DONE );
975            $self->KeepStats( $Job->getstatistics() );
976
977            # Now we can destroy the clone as we kept all needed information
978            $self->ReturnInfoToClient( $Jid );
979
980            # Freeing job and sanity
981            $Jobs{$a2pjob}->jobdone($Job) if (defined($Jobs{$a2pjob}));
982            $Job->freeparent ;
983            &delete_job($Jid);
984
985            &UPSTAT('RELEASEDJOB');
986
987        #--------------------------------------------
988        # It's a waiting A2P job
989        #--------------------------------------------
990        } else {
991            if ( $Job->nomorejob ) {
992                #
993                # Cloned jobs are finished
994                #
995                $Job->stop_ms; # Stop chrono
996
997                $Job->stepdone ;
998
999                # Release memory of clones unless error are present to keep a
1000                # dump of them in sent alert
1001                $Job->clearjobsdone unless $Job->error ;
1002
1003                $Job->start_ms
1004                    and $Job->nextstep("Cleaning environment");
1005
1006                # Preparing to convert another file immediatly, not waiting the
1007                # previous job to be cleaned
1008                $do_file ++ if $CLEAN_OPTIMIZATION ;
1009
1010            } else {
1011                $Job->a2pjob_progress ;
1012            }
1013
1014            &EnqueueJob($Jid);
1015        }
1016
1017    ############################################################################
1018    # Step 11: Environment is clean, next is to purge
1019    ############################################################################
1020    } elsif ( $Job->isstep(11)) {
1021
1022        # Check to send a move command
1023        if ($Job->do_move_cmd) {
1024            my $cmd = $Job->move_files_cmd || "" ;
1025            # Send now any non empty move command if not still done
1026            if ($cmd){
1027                $self->Request( BackEnd => $cmd );
1028                &UPSTAT('MOVE_CMDS');
1029            } else {
1030                &EnqueueJob($Jid);
1031                &UPSTAT('BAD_MOVE_CMDS');
1032            }
1033
1034        } elsif ( $Job->do_clean_cmd ) {
1035            my $cmd = $Job->clean_cmd || "" ;
1036            # Send now any clean command if not still done
1037            if ($cmd) {
1038                $self->Request( BackEnd => $cmd );
1039                &UPSTAT('CLEAN_CMDS');
1040            } else {
1041                &EnqueueJob($Jid);
1042                &UPSTAT('BAD_CLEAN_CMDS');
1043            }
1044
1045        } else {
1046            # Cleaning is finished
1047            $Job->stop_ms; # Stop chrono
1048
1049            if ( $Job->getstate != DONE ) {
1050                $Job->error( $Job->getstate, "ABTERM: Error while cleaning");
1051                $Job->cannotpurge ;
1052
1053            } else {
1054                $Job->canpurge ;
1055                $Job->stepdone ;
1056            }
1057
1058            $Job->start_ms
1059                and $Job->nextstep("Purging environment");
1060            return &EnqueueJob($Jid);
1061        }
1062
1063        # Check to return progression for long job
1064        $Job->clean_progress ;
1065
1066    ############################################################################
1067    # Step 12: Try to purge environment and delete job
1068    ############################################################################
1069    } elsif ( $Job->isstep(12) ) {
1070
1071        if ($Job->purge or $Job->getpurgetry == 0) {
1072            $Job->error(112, "ABTERM: Error while purging" . (($!)?" ($!)":""))
1073                unless $Job->purge ;
1074
1075            $Job->stop_ms; # Stop chrono
1076            $Job->setstate( DONE );
1077
1078            # Keep statistics from this job before delete it
1079            $self->KeepStats( $Job->getstatistics() );
1080
1081            # Anyway step is done
1082            $Job->stepdone ;
1083
1084            if ( $Job->error ) {
1085                # Check to do alert
1086                $Job->jobalert ;
1087
1088                # Now we can release memory
1089                $Job->clearjobsdone ;
1090
1091                &UPSTAT('ABTERM');
1092
1093            } else {
1094                &UPSTAT('RELEASEDAFPJOB');
1095            }
1096
1097            $self->ReturnInfoToClient( $Jid );
1098            &delete_job($Jid);
1099
1100            # Manage case used in tests & diagnostics to stop service after
1101            # a JOB_COUNTDOWN has been reached
1102            &UPSTAT('JOBDONE');
1103            if ($JOBS_COUNTDOWN) {
1104                if ($STATS{'JOBDONE'} >= $JOBS_COUNTDOWN) {
1105                    &Info("Countdown of $JOBS_COUNTDOWN jobs reached");
1106                    # Ask maintid to stop
1107                    kill 15, $maintid ;
1108
1109                } else {
1110                    my $ratio = int(($STATS{'JOBDONE'}*100) / $JOBS_COUNTDOWN) ;
1111                    unless ( ! $ratio or exists($STATS{'RATIO-'.$ratio})
1112                    or $ratio % 10) {
1113                        $STATS{'RATIO-'.$ratio} = localtime(time) ;
1114                        &Info("Process of $JOBS_COUNTDOWN jobs countdown: ".
1115                            sprintf('%02d%%',$ratio));
1116                    }
1117                }
1118            }
1119
1120            return $Job->error ? 0 : 1 ;
1121
1122        } else {
1123            # This can occur when zip file command is not finished and some
1124            # files to clean are still in a folder to purge
1125            $Job->jobdebug("Environment still not purged for Job $Job");
1126
1127            # Trying to purge later
1128        }
1129        &EnqueueJob($Jid);
1130
1131    ############################################################################
1132    # Bad job
1133    ############################################################################
1134    } else {
1135        $Job->error(5,"ABTERM: Don't know what to do");
1136
1137        # Check to do alert
1138        $Job->jobalert ;
1139        $self->ReturnInfoToClient( $Jid );
1140
1141        &delete_job($Jid);
1142
1143        &UPSTAT('ABTERM');
1144
1145        return 0 ;
1146    }
1147    return 1 ;
1148}
1149
1150sub ThreadInit {
1151    my $self = shift ;
1152
1153    # Initializes stat headers
1154    $self->{STATHEADERS} = [ qw(
1155        Date    DayTime     Jobname  DestId      SubStatus  Status
1156        Errors  A2P-Chrono  LockId   Job-Timing  ) ];
1157
1158    $self->{HAVE_STATS} = 1 ;
1159
1160    # Other initialization
1161    $self->{PENDING_REQUEST} = $self->{MANAGEJOBS_COUNT} = 0 ;
1162    $self->{REQUESTS_ARRAY}  = [] ;
1163    $self->{PENDING_LIMIT}   = 10 ;
1164    $self->{PENDING_TIMING}  = $self->{MANAGEJOBS_TIME} = [ &gettimeofday() ];
1165}
1166
1167sub InitUpdated {
1168    my $self = shift;
1169
1170    # Garanty pending limit is greater than zero and not excessive
1171    $self->{PENDING_LIMIT} = $MAXJOBS +
1172        ( $MAXJOBS ? $BACKEND_FACTOR * $MAXJOBS : 10 );
1173}
1174
1175sub Request {
1176    # Modified Request to limit the rate of request relative to the number
1177    # of Backends
1178    my $self = shift ;
1179
1180    &MAXSTAT('pending-REQUEST',$self->{PENDING_REQUEST});
1181
1182    return unless ( @_ or @{$self->{REQUESTS_ARRAY}} );
1183
1184    if ( @_ ) {
1185        &Debug("Caching '@_'")
1186            if ( ! $NO_SYSLOG_DEBUG and @{$self->{REQUESTS_ARRAY}} );
1187        push @{$self->{REQUESTS_ARRAY}}, \@_ ;
1188
1189        # Update current pending requests number
1190        $self->{PENDING_REQUEST} ++ unless ( $_[0] eq 'Listener' );
1191    }
1192
1193    while ( $self->{PENDING_REQUEST} < $self->{PENDING_LIMIT} or
1194    rand $self->{PENDING_LIMIT} < &tv_interval($self->{PENDING_TIMING})) {
1195        # The rand test allow sending new request even when there is none
1196        # This guaranty a request will be send sometime by just calling this sub
1197        # So we can call it regularily on a time basis to not forget any request
1198
1199        my $ref = shift @{$self->{REQUESTS_ARRAY}} ;
1200        last unless (defined($ref));
1201        next unless @{$ref} ;
1202
1203        my $req = &GetCom( comREQ , @{$ref} );
1204        &Debug("Requesting '$req'");
1205        $self->Answer( $req );
1206        &UPSTAT('ASKED-REQUESTS');
1207
1208        $self->{PENDING_TIMING} = [ &gettimeofday() ];
1209        $STATS{'LAST-PENDING-REQUEST'} = "@{$ref}" ;
1210
1211        # Keep state we have some new tasks for other jobs to do in the next
1212        # loops as we have been able to transmit pending request
1213        $self->{DONTSLEEP} ++ ;
1214    }
1215
1216    &MAXSTAT('cached-REQUEST',scalar(@{$self->{REQUESTS_ARRAY}}));
1217}
1218
1219sub DoBeforeQuit {
1220    my $self = shift ;
1221
1222    return 1 unless ( $SCAN_SPOOL and $KEEP_JOBSTATUS );
1223
1224    my $alerted = "" ;
1225
1226    &Debug("Still known jobs: " . join(' ',keys(%Jobs))) if (keys(%Jobs));
1227    # Get JIDs in SlowJobs list
1228    push @Jobs, map { shift @{$_} } @SlowJobs if @SlowJobs ;
1229    &Debug("Was about to process jobs: @Jobs") if @Jobs ;
1230    &Debug("Still active jobs: " . join(' ',&ActiveJobCount()))
1231        if (&ActiveJobCount());
1232
1233    # Set status to aborted to any existing job not in ABTERM
1234    foreach my $Jid ( keys(%Jobs) ) {
1235
1236        unless (&is_job($Jid)) {
1237            &delete_job($Jid);
1238            &Warn("Forgetting status of deleted $Jid job");
1239            next ;
1240        }
1241
1242        my $Job = $Jobs{$Jid} ;
1243
1244        # Only steps saying we are managing an AFP file are significant
1245        if ( $Job->isstep(0)
1246        or ($Job->isstep(1) and $Job->getstate == NOMOREFILE )) {
1247            &Debug("Job $Jid can't involve a conversion");
1248            &delete_job($Jid);
1249            next ;
1250        }
1251
1252        my $alert = "Job $Jid was aborted " ;
1253        if ($Job->isstep(1)) {
1254            if ( $Job->getanswer and -e $Job->getanswer ) {
1255                $alert .= "with a file to convert" ;
1256
1257            } else {
1258                $alert .= "without knowing if a file is to be converted" ;
1259            }
1260
1261        } elsif ($Job->isstep(2)) {
1262            $alert .= "during TeX conversion" ;
1263
1264        } elsif ($Job->isstep(3)) {
1265            $alert .= "during PDF conversion" ;
1266
1267        } elsif ($Job->isstep(4)) {
1268            $alert .= "during DVI conversion" ;
1269
1270        } elsif ($Job->isstep(5) or $Job->isstep(6)) {
1271            $alert .= "during PCL conversion" ;
1272
1273        } elsif ($Job->isstep(7)) {
1274            $alert .= "during PS conversion" ;
1275
1276        } elsif ($Job->isstep(8) or $Job->isstep(9)) {
1277            $alert .= "during Printing step" ;
1278
1279        } elsif ($Job->isstep(10) and $Job->geta2pjob()) {
1280            $alert .= "during Archiving step" ;
1281
1282        } elsif ($Job->isstep(10)) {
1283            $alert .= "waiting after sub jobs" ;
1284
1285        } elsif ($Job->isstep(11)) {
1286            $alert .= "during cleaning step" ;
1287
1288        } elsif ($Job->isstep(12)) {
1289            $alert .= "during purging step" ;
1290
1291        } elsif ($Job->error) {
1292            # If we came here, an alert should still has been sent for this job
1293            &Warn("Job $Jid in error aborted");
1294            &delete_job($Jid);
1295            next ;
1296
1297        } else {
1298            $alert .= "in an unknown state" ;
1299        }
1300
1301        $Job->jobstatus( 'A', { STATUS => 'KO', INFOS  => 'Aborted' } )
1302            or &Info("Can't update aborted '$Jid' status");
1303
1304        $Job->cleanjob ;
1305
1306        $JOB_DUMP_ON_ERROR = 1 ; # Force the dump of the job object
1307        $Job->jobalert( $alert );
1308        $alerted .= ($alerted?" ":"") . $Jid ;
1309
1310        &delete_job($Jid);
1311    }
1312
1313    &Debug($alerted?"Alerts sent for $alerted jobs":"No alert sent");
1314}
1315
1316&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
1317
13181;
Note: See TracBrowser for help on using the repository browser.