# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: JobManager.pm 3 2007-10-18 16:20:19Z guillaume $ # package A2P::JobManager; # Derived class from Thread.pm use base qw(A2P::Thread); use strict ; use integer ; use Time::HiRes qw( gettimeofday tv_interval ); use A2P::Globals ; use A2P::Syslog ; use A2P::Signal ; use A2P::Job ; use A2P::Thread ; use A2P::Com qw( IsCom GetCom comJOB comASK comSAY comDONE comINF comREQ ); BEGIN { our $VERSION = sprintf "%s", q$Rev: 1399 $ =~ /(\d[0-9.]+)\s+/ ; } our $VERSION ; # Jobs management vars and subs my @Jobs ; # List of jobs waiting for an action my @SlowJobs ; # List of jobs waiting slowly for an action my %Jobs ; # List of current jobs objects, referenced by their current ID my %Cancels ; # List of cancelled jobs sub is_job { return defined($_[0]) ? ( ref($Jobs{$_[0]}) =~ /^A2P::Job$/ ? 1 : 0 ) : 0 ; } sub JobCount { return grep { defined($Jobs{$_}) and $Jobs{$_}->geta2pjob eq "" and $Jobs{$_}->is_not_abterm # Don't count splitted jobs as they are slower to handle and ! $Jobs{$_}->is_splitted and $Jobs{$_}->getstep < $CLEAN_OPTIMIZATION ? 11 : 13 and ! exists($Cancels{$_}) } keys(%Jobs); } sub ActiveJobCount { return grep { defined($Jobs{$_}) and $Jobs{$_}->geta2pjob eq "" and $Jobs{$_}->is_not_abterm } keys(%Jobs); } sub delete_job { my $jid = shift ; return unless (&is_job($jid)); # Check if we are deleting an AFP job unless ( $Jobs{$jid}->geta2pjob ) { # Empty waiting jobs list from the Jids @Jobs = grep { $_ !~ /^$jid(-\d+){0,1}$/ } @Jobs ; @SlowJobs = grep { $_->[0] !~ /^$jid(-\d+){0,1}$/ } @SlowJobs ; # Delete clone objects before afpjob map { delete $Jobs{$_} } ( $Jobs{$jid}->getclones, $Jobs{$jid}->getfinishedclones ); } delete $Jobs{$jid} ; } sub EnqueueJob { my $jid = shift ; return unless ( &is_job($jid) and ! grep { $_ eq $jid } @Jobs ); push @Jobs , $jid ; } sub SlowEnqueueJob { my $jid = shift ; return unless ( &is_job($jid) and ! grep { $_->[0] eq $jid } @SlowJobs ); my $factor = shift || 1 ; &UPSTAT('SLOW-ENQUEUED'); $factor = 10 unless ( $factor < 10 ); &MAXSTAT('SLOW-QUEUE-FACTOR',$factor); push @SlowJobs, [ $jid, [ &gettimeofday() ], $factor ] ; } ################################################################################ ## Job Manager code ## ################################################################################ sub Do { my $self = shift ; my $ref = shift ; my $Ret = 0 ; my @Job = &IsCom( comJOB , $$ref ); if ( @Job == 2 ) { my ( $Jid , $Answer ) = @Job ; &Debug("Received request for job '$Jid'"); if (defined($Jobs{$Jid})) { if ( &IsCom( comINF , $Answer ) ) { &UPSTAT('GET-COMINF'); my @infos = &IsCom( comINF , $Answer ) ; my $info = join(": ",@infos); # This information should return to client immediatly if ( $Jobs{$Jid}->getClientRef() ) { # This is only some information during job processing $Jobs{$Jid}->jobdebug("Saying '$info' to " . $Jobs{$Jid}->getClientRef()); $self->Request( Listener => &GetCom( comSAY , $Jobs{$Jid}->getClientRef() => $info ) ); } elsif ( $infos[0] =~ /^ABTERM\s(\d+)$/i ) { $Jobs{$Jid}->error( $1, "ABTERM: " . $infos[1] ); } else { $Jobs{$Jid}->jobinfo($info); } } else { # This is an expected answer if ( $Answer =~ /^\d+$/ ) { # is_numeric $Jobs{$Jid}->setstate($Answer); } else { $Jobs{$Jid}->setanswer($Answer); } # Update current pending requests number if ($self->{PENDING_REQUEST}) { $self->{PENDING_REQUEST} -- ; &UPSTAT('pending-REQUEST',-1); } # Processing for this job $Ret = $self->ManageJobs($Jid); } $Ret ++ unless $Ret ; } else { &Warn("Don't know anything of $Jid job in '$$ref' request"); } } elsif ( $$ref =~ m|START|i ) { if ( $SCAN_SPOOL ) { &Debug("Starting to get files from spool"); $do_file ++ ; } $Ret = $self->Answer('STARTED'); # Is this a question ? } elsif ( my @Ask = &IsCom( comASK , $$ref ) ) { my $name = $Ask[0] ; &Debug("Received question '$Ask[1]' from $name"); if ( @Job = &IsCom( comJOB , $Ask[1] )) { # Prepare default returned value my $thisjob = $Job[0] ; # Check if it's a cancel request if ( $Job[1] =~ /^cancel$/i ) { &Debug("Got cancelling request for $thisjob job"); return $self->CancelJob( $name, $thisjob ); } # Finally, create an a2p job and enqueue it in our job list my $job = new A2P::Job( $thisjob , $name ); if ($job) { $job->setstate(0); $job->setanswer($Job[1]); $job->setstep(0); $job->start_ms and $job->nextstep("Got file to convert"); # Starting new job $Jobs{$thisjob} = $job ; $Ret = $self->ManageJobs($thisjob); } else { &Debug("Reporting error to $name"); $self->Request( Listener => &GetCom( comSAY , $name => 'ABORTING RESTART ON RUNNING JOB' ) ); $self->Request( Listener => &GetCom( comSAY , $name => &GetCom( comDONE , '' => "JOBSTATUS: KO" ) )); $Ret = $thisjob ; } } else { $self->ThisError("[DEV] Unsupported '$Ask[1]' question from $name"); } } else { $self->ThisError("Got bad request '$$ref'"); } return $Ret ; } sub CancelJob { my $self = shift ; my $name = shift ; my $thisjob = shift ; if (&is_job($thisjob)) { &Info("trying to cancel $thisjob job as asked by $name"); if ( $Cancels{$thisjob} = $Jobs{$thisjob}->cancel ) { $Cancels{$thisjob}->jobinfo("Cancel is pending"); $self->Request( Listener => &GetCom( comSAY , $name => 'CANCEL PENDING' ) ); $self->Request( Listener => &GetCom( comSAY , $name => &GetCom( comDONE , '', "CANCEL: OK" ) )); } else { # Still delete this job if nothing more is expected &delete_job($thisjob); delete $Cancels{$thisjob} ; $self->Request( Listener => &GetCom( comSAY , $name => 'CANCEL NOTHING OR TOO LATE' ) ); $self->Request( Listener => &GetCom( comSAY , $name => &GetCom( comDONE , '', "CANCEL: KO" ) )); } } elsif (exists($Cancels{$thisjob})) { &Debug("Can't cancel still cancelled $thisjob job"); $self->Request( Listener => &GetCom( comSAY , $name => 'STILL CANCELLING JOB' ) ); $self->Request( Listener => &GetCom( comSAY , $name => &GetCom( comDONE , '', "CANCEL: OK" ) )); } else { &Warn("Can't cancel no more managed $thisjob job"); $self->Request( Listener => &GetCom( comSAY , $name => 'ABORTING CANCEL ON UNAVAILABLE JOB' ) ); $self->Request( Listener => &GetCom( comSAY , $name => &GetCom( comDONE , '', "CANCEL: KO" ) )); } return $thisjob ; } my $lastcheck = [ &gettimeofday() ]; sub GetRequest { my $self = shift ; my $Requests = shift ; my $Ret = undef ; my @JOBS ; ####################### # Check flags ####################### { # Generate new job at SPOOL_TIME time no integer ; if ( &tv_interval($lastcheck) > $SPOOL_TIMER ) { $lastcheck = [ &gettimeofday() ]; $do_file ++ ; } } if ( $do_file and $SCAN_SPOOL ) { @JOBS = &JobCount() ; $STATS{'COUNTED-JOBS'} = "@JOBS" ; if ( @JOBS < $MAXJOBS ) { # Create an a2p job and enqueue it in our job list my $job = new A2P::Job ; # Enqueing new job $Jobs{$job->getid} = $job ; &EnqueueJob( $job->getid ); # Don't decrease do_file if many jobs are active as this means # we can burst job creation as AFP files should be availables $do_file -- unless ( &JobCount > 1 and &JobCount < $MAXJOBS ); } else { &Debug("Skip new job creation as there's still enough job running"); $do_file -- ; } } # Check to do some jobs and com for that jobs $self->{DONTSLEEP} = 0 ; $self->Request(); $self->ManageJobs(); @JOBS = &ActiveJobCount() ; $STATS{'ACTIVES-JOBS'} = "@JOBS" ; &MAXSTAT('ACTIVES-JOBS-COUNT',scalar(@JOBS)); # Really quit when MAXJOBS has been set to 0 and there's no more job running $self->{DO_QUIT} = 1 unless ( $MAXJOBS # Check if pending jobs are not cancelled or ( @JOBS and @JOBS != grep { exists($Cancels{$_}) } @JOBS )); return $MUSTQUIT ? 1 : $self->ReadSock($Requests) ; } sub ReturnInfoToClient { # Return if not a client job return unless $Jobs{$_[1]}->getClientRef() ; # In case Job is requested by a client, send info to client &Debug("Checking if $_[1] is a client to return to some info"); my @info = $Jobs{$_[1]}->getClientInfo ; if ( @info ) { my $Ref = shift @info ; # Array becomes empty while there's no info while ( @info ) { my $info = shift @info ; $Jobs{$_[1]}->jobdebug("Saying '$info' to $Ref"); $_[0]->Request( Listener => &GetCom( comSAY , $Ref => $info ) ); } } # Check if we need to close conversation if ( $Jobs{$_[1]}->isstep(13) or ( $Jobs{$_[1]}->isstep(12) and $Jobs{$_[1]}->getstate == DONE )) { my $Ref = $Jobs{$_[1]}->getClientRef() ; $Jobs{$_[1]}->unsetClientRef() ; $_[0]->Request( Listener => &GetCom( comSAY , $Ref => &GetCom( comDONE , '' => "JOBSTATUS: " . ($Jobs{$_[1]}->error()?'KO':'OK') ) )); } } sub loopdebug { my $self = shift ; &Debug("Current active jobs: " . &ActiveJobCount(), "Current cached requests:",@{$self->{REQUESTS_ARRAY}}); $self->commonloopdebug(); } sub ManageJobs { my $self = shift ; my $Jid = shift ; #################################################################### # Check if we can get next job from SlowJobs list #################################################################### my $slowfactor = 0 ; if ( @SlowJobs and ! defined($Jid) ) { my $slowref = shift @SlowJobs ; $Jid = $slowref->[0] ; my @timeout = @{$slowref->[1]} ; my $delay = $slowref->[2]++ ; $delay *= $USLEEP / 1_000_000 ; # The rest of the array is a timer if ( &tv_interval(@timeout) < $delay ) { push @SlowJobs, $slowref ; undef $Jid ; } else { # Set slowfactor in case we need it with SlowEnqueing $slowfactor = $slowref->[2] ; } } ################################## # Get one Job Id if not a new one ################################## $Jid = shift @Jobs unless (defined($Jid)); return 1 unless $Jid ; return &Warn("$Jid is not in my jobs list") unless (&is_job($Jid)); # Compute a management rate for internal statistics if ($self->{MANAGEJOBS_COUNT} ++ and &tv_interval($self->{MANAGEJOBS_TIME}) > 10 ) { my $rate = $self->{MANAGEJOBS_COUNT} / &tv_interval($self->{MANAGEJOBS_TIME}) ; &MAXSTAT('MANAGEJOBS-RATE',sprintf("%.1f",$rate)); $self->{MANAGEJOBS_COUNT} = 0 ; $self->{MANAGEJOBS_TIME} = [ &gettimeofday() ] ; } # As we have tasks to do, avoid to sleep on main loop $self->{DONTSLEEP} ++ unless ($self->{LASTJOB} eq $Jid); $self->{LASTJOB} = $Jid ; my $Job = $Jobs{$Jid} ; $self->ReturnInfoToClient( $Jid ) unless ($Job->isstep(12)); # Check if job is in an agregation error case to shorten processing goto JOBDONE if ( $Job->badagregation and ! $Job->isstep(10) ); ############################################################################ # Step 0: Job just created, it need a file, so give it to SpoolManager ############################################################################ if ( $Job->isstep(0) ) { $Job->jobdebug("Starting Job $Jid with a2p v" . A2P_RPM_VERSION ); $Job->jobinfo("Checking one file from AFPSPOOL"); $Job->start_ms and $Job->nextstep("Waiting for a filename"); $self->Request( SpoolManager => $Job->request ); ############################################################################ # Step 1: A2P Job returned with a filename or with NOMOREFILE ############################################################################ } elsif ( $Job->isstep(1) ) { $Job->stop_ms; # Stop chrono if ( $Job->getstate == NOMOREFILE ) { $Job->jobinfo("No file to convert"); $Job->releaseinfo ; &delete_job($Jid); &UPSTAT('NOAFPJOB'); } elsif ( $Job->getstate == 0 ) { $Job->releaseinfo ; # Have a file to convert my $file = $Job->setfile( $Job->getanswer ); $Job->start_ms and $Job->nextstep("Converting '$file' to TeX format"); $self->Request( Converter => $Job->toconverter ); # Also say to SpoolManager its job is done and we manage now here # any information about the job and its AFP file $self->Request( SpoolManager => $Job->request( DONE )) if ( $SCAN_SPOOL and ! $Job->is_splitted ) ; &UPSTAT('STARTEDAFPJOB'); } else { $Job->jobalert( $Job->getstate, "ABTERM: Can't get a file from AFPSPOOL"); $MUSTQUIT ++ ; } ############################################################################ # Step 2: A2P Job returned from a converter ############################################################################ } elsif ( $Job->isstep(2) ) { if ( $Job->getstate ) { # Check if we need to split AFP file before stopping this step my $do_split = $Job->split_cmd ; if (defined($do_split)) { if ($do_split =~ /^\d+$/) { if ($do_split) { $Job->error( $do_split, "ABTERM: Splitting error"); } else { # Not ready to split, so process later &SlowEnqueueJob($Jid,$slowfactor); } } else { $self->Request( BackEnd => $do_split ); } # Still return when no error found in splitting return 1 unless $Job->error ; } $Job->stop_ms; # Stop chrono # Wait for all TeX jobs to be done $Job->a2pjob_progress_init ; $Job->start_ms and $Job->setstep(10); # Initializes sequence computation $Job->sequence_ready(); if ($Job->getstate == DONE) { $Job->jobinfo("Waiting after cloned job"); } else { # Error $Job->error( $Job->getstate, "ABTERM: Bad conversion") unless ( $Job->error >= 90 and $Job->error < 100 ); $Job->jobinfo("Waiting any cloned job after conversion ABTERM"); # Update files to clean $Job->setconversionerrorcleaning ; } &EnqueueJob($Jid); } else { # Have a converted file, create a new job starting at step 3 # Or this is just a folder name to add in todelete list # Or also it can be a split event if ( -d $Job->getanswer ) { $Job->setjobdir( $Job->getanswer ); return 1 ; } # Create a cloned job to continue steps my $job = $Job->newjob() ; # We reach a split event if job is undefined unless (defined($job)) { # Re-enqueue this job as it should be set DONE now $Job->setstate(DONE); &EnqueueJob($Jid); # Get the tag to save this AFP as with Tag my $tag = $Job->{'SPLIT_TAG'} ; my $newtag = $Job->{'SPLIT_TAG2'} ; # Then create an a2p job and enqueue it in our job list for the # same file but starting at a given position $job = new A2P::Job ; $Jobs{$job->getid} = $job ; # Initialize job from splitted job $job->split_from( $Job ); # Starting new job by enqueing it $job->setstep(0); $job->start_ms and $job->nextstep("New AFP to convert $newtag jobs" . " from " . $job->getfilestart . " pos in original AFP file"); # Initialize jobstatus with AfpName to keep restarts in mind # as step 1 done as if it was set by SpoolManager my ( $AfpName ) = $Job->getfile =~ $AFPNAME_REGEX ; $job->jobstatus( 'o', { AFP => $AfpName.'.'.$newtag, JOBID => $job->getid, STATUS => 'STARTING', FILE => $Job->getfile.'.'.$newtag, INFOS => 'Started' } ) or &Info("Can't initialize '".$job->getid."' status"); &EnqueueJob($job->getid); # Still return after handling the split return 1 ; } &UPSTAT('STARTEDJOB'); $Job->jobdebug("Cloned as job " . $job->getid . " to process " . $job->getfile ); my $jid = $job->getid ; $Jobs{$jid} = $job ; # Update job status now to set the DESTID $job->jobstatus( '.', { DESTID => $job->getdestid , JOBNAME=> $job->getjobname , STATUS => "STARTED" }) or &Info("Can't set job status at step 3"); if ($job->do_pdf or $ONLY_DO_PDF) { $job->start_ms and $job->nextstep("Generating PDF " . ( $job->getClientRef() ? "'" . $job->getPdfName() . "'" : "file" )); $self->Request( BackEnd => $job->pdf_cmd ); } else { $Job = $job ; $Jid = $jid ; $Job->stepskip("No pdf file required, skipped"); goto STEP4 ; } } ############################################################################ # Step 3: Pdf file done or not needed, check to do DVI ############################################################################ } elsif ( $Job->isstep(3) ) { if ($Job->do_pdf or $ONLY_DO_PDF) { # Still rename PDF to its final name $Job->rename_pdf ; $Job->stop_ms ; # Stop chrono # Check state if ( $Job->getstate != DONE ) { $Job->error( $Job->getstate, "ABTERM: Bad PDF generation"); # Any error disable next processing, see A2P::Job->error # We must also set an error to diasable any sequence processing $Job->error( 701, "Aborting the printing sequence" ); } else { $Job->stepdone ; } # Finish immedialy this job if needed if ( $ONLY_DO_PDF ) { $Job->setstep(10); goto JOBDONE ; } } STEP4: if ($Job->do_pcl or $Job->do_vpcl or $Job->do_ps) { $Job->start_ms and $Job->nextstep("Converting to DVI format"); $self->Request( BackEnd => $Job->dvi_cmd ); } else { $Job->stepskip("No dvi format required, skipped"); # Nothing to print so jump directly to step 10 $Job->setstep(9); goto STEP10 ; } ############################################################################ # Step 4: DVI file done or not needed, check to do PCL ############################################################################ } elsif ( $Job->isstep(4) ) { if ($Job->do_pcl or $Job->do_vpcl or $Job->do_ps) { $Job->stop_ms ; # Stop chrono # Check state if ( $Job->getstate != DONE) { $Job->error($Job->getstate,"ABTERM: Bad DVI format conversion"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } } STEP5: if ($Job->do_pcl) { $Job->start_ms and $Job->nextstep("Generating PCL format"); $self->Request( BackEnd => $Job->pcl_cmd ); } else { $Job->stepskip("No pcl format required, skipped"); goto STEP6 ; } ############################################################################ # Step 5: PCL file done or not needed, check to do VPCL ############################################################################ } elsif ( $Job->isstep(5) ) { if ($Job->do_pcl) { $Job->valid_pcl; # Check PCL file $Job->stop_ms ; # Stop chrono # Check state if ( $Job->getstate != DONE) { $Job->error($Job->getstate,"ABTERM: Bad PCL format generation"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } } STEP6: if ($Job->do_vpcl) { my $vpcl_cmd = $Job->vpcl_cmd ; # Only ask to generate vpcl if we need a different pcl than previous if ( $vpcl_cmd ) { $Job->start_ms and $Job->nextstep("Generating PCL format for validation"); $self->Request( BackEnd => $vpcl_cmd ); } else { $Job->stepskip("Validation PCL still generated " . "during previous step"); goto STEP7 ; } } else { $Job->stepskip("No validation pcl file required, skipped"); goto STEP7 ; } ############################################################################ # Step 6: VPCL file done or not needed, check to do PS ############################################################################ } elsif ( $Job->isstep(6) ) { if ($Job->do_vpcl) { $Job->valid_vpcl; # Check PCL file $Job->stop_ms ; # Stop chrono # Check state if ( $Job->getstate != DONE) { $Job->error( $Job->getstate, "ABTERM: Bad PCL format for validation"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } } STEP7: if ($Job->do_ps and ! $Job->do_pcl) { $Job->start_ms and $Job->nextstep("Generating PS format"); $self->Request( BackEnd => $Job->ps_cmd ); } else { $Job->stepskip("No PS file required, skipped"); $Job->jobinfo("DESTID is " . $Job->getdestid ) if ($Job->getdestid); goto STEP8 ; } $Job->jobinfo("DESTID is " . $Job->getdestid ) if ($Job->getdestid); &EnqueueJob($Jid); ############################################################################ # Step 7: PS file done or not needed, check to do PRINT ############################################################################ } elsif ( $Job->isstep(7) ) { if ($Job->do_ps and ! $Job->do_pcl and $Job->stepnotdone) { $Job->stop_ms ; # Stop chrono # Check state if ( $Job->getstate != DONE) { $Job->error( $Job->getstate, "ABTERM: Bad PS format"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } } STEP8: my $later = 0 ; # do_lpr must be in sequence if ($Job->do_lpr(\$later)) { $Job->start_ms and $Job->nextstep( ( $Job->next_in_sequence ? "Agregating for " : "Printing to " ) . $Job->printer ); $self->Request( BackEnd => $Job->lpr_cmd ); # Just enqueue again this job step if not its turn } elsif ($later) { $Job->jobdebug("Waiting for print sequence"); # Set it waiting if the sequence is known as this means this is not # our turn to print if ($Job->sequence_ready()) { return $Job->waiting(1); } else { &EnqueueJob($Jid); } } else { if ($Job->error) { # Need to check sequence if we pass here after a error in the # middle of a sequence &EnqueueJob( $Job->get_next_authorized ) if ( $Job->remove_from_sequence_but_authorize_next ); } $Job->stepskip("No print required, skipped"); goto STEP9 ; } # Here the job is not waiting $Job->waiting(0); ############################################################################ # Step 8: Print done or not needed, check to do VPRINT ############################################################################ } elsif ( $Job->isstep(8) ) { if ($Job->do_lpr and $Job->stepnotdone) { $Job->stop_ms ; # Stop chrono # Print is in the print spool we can authorize next print # in the sequence &EnqueueJob( $Job->get_next_authorized ) if ( $Job->authorize_next_print and $Job->next_is_waiting ); # Check state if ( $Job->getstate != DONE ) { $Job->error( $Job->getstate, "ABTERM: Print Error"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } } STEP9: my $later = 0 ; if ($Job->do_vlpr(\$later)) { $Job->start_ms and $Job->nextstep( ( $Job->next_in_sequence ? "Agregating validation for " : "Printing for validation to " ) . $Job->vprinter ); $self->Request( BackEnd => $Job->vlpr_cmd ); # Just enqueue again this job step if not its turn } elsif ($later) { $Job->jobdebug("Waiting for validation print sequence"); # Set it waiting if the sequence is known as this means this is not # our turn to print if ($Job->sequence_ready()) { return $Job->waiting(1); } else { &EnqueueJob($Jid); } } else { if ($Job->error) { # Need to check sequence if we pass here after a error in the # middle of a sequence &EnqueueJob( $Job->get_next_authorized ) if ( $Job->remove_from_sequence_but_authorize_next ); } $Job->stepskip("No validation print required, skipped"); goto STEP10 ; } $Job->waiting(0); ############################################################################ # Step 9: VPrint done or not needed, check to archive ############################################################################ } elsif ( $Job->isstep(9) ) { if ($Job->do_vlpr) { $Job->stop_ms ; # Stop chrono # Print is in the print spool we can authorize next print # in the sequence &EnqueueJob( $Job->get_next_authorized ) if ( $Job->authorize_next_print and $Job->next_is_waiting ); # Check state if ( $Job->getstate != DONE ) { $Job->error( $Job->getstate, "ABTERM: Validation Print error"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } } STEP10: if ($Job->do_arch) { $Job->start_ms and $Job->nextstep("Archiving PDF file"); if ( $Job->get_arch_file ) { $self->Request( Archiver => $Job->get_arch_file ); } else { $Job->error( 110, "Need to archive something but not arch base available"); goto JOBDONE ; } } elsif ($Job->do_eservice) { $Job->start_ms and $Job->nextstep("Transmitting PDF file to '" . $Job->eservice_name . "'"); if ( $Job->eservice_request ) { $self->Request( EService => $Job->eservice_request ); } else { $Job->error( 110, "Need to transmit something but no content available"); goto JOBDONE ; } } else { $Job->stepskip("No archiving required, skipped"); goto JOBDONE ; } ############################################################################ # Step 10: Waiting for cloned job to be processed ############################################################################ } elsif ( $Job->isstep(10) ) { JOBDONE: my $a2pjob = $Job->geta2pjob ; #------------------------------------------------- # It's cloned job waited after by its A2P job #------------------------------------------------- if (length($a2pjob)) { if ($Job->do_arch) { $Job->stop_ms ; # Stop chrono # Check state if ( $Job->getstate != DONE ) { $Job->error( $Job->getstate, "ABTERM: Archiver error"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } # Check if we need to pass again to step 10 for E-Service... if ($Job->do_eservice) { $Job->setstep(9) ; goto STEP10 ; } } elsif ($Job->do_eservice) { $Job->stop_ms ; # Stop chrono # Check state if ( $Job->getstate != DONE ) { $Job->error( $Job->getstate, "ABTERM: '" . $Job->eservice_name . "' E-Service error"); # Any error disable next processing, see A2P::Job->error } else { $Job->stepdone ; } } if (defined($Jobs{$a2pjob})) { # Check error to update a2pjob error state $Jobs{$a2pjob}->error( $Job->error, "Cloned job $Jid gives error on step " . $Job->getsteperror . ": " . join(', ',$Job->geterrors)) if ($Job->error); # Update parent job even if any other clone produced error $Jobs{$a2pjob}->todelete($Job->get2delete); $Jobs{$a2pjob}->toclean($Job->get2clean); $Jobs{$a2pjob}->tomove($Job->get2move); } else { $Job->error( 10 , "AFPFile job object has been released" ); } # Keep statistics from this job before delete it $Job->setstate( $Job->error ? $Job->error : DONE ); $self->KeepStats( $Job->getstatistics() ); # Now we can destroy the clone as we kept all needed information $self->ReturnInfoToClient( $Jid ); # Freeing job and sanity $Jobs{$a2pjob}->jobdone($Job) if (defined($Jobs{$a2pjob})); $Job->freeparent ; &delete_job($Jid); &UPSTAT('RELEASEDJOB'); #-------------------------------------------- # It's a waiting A2P job #-------------------------------------------- } else { if ( $Job->nomorejob ) { # # Cloned jobs are finished # $Job->stop_ms; # Stop chrono $Job->stepdone ; # Release memory of clones unless error are present to keep a # dump of them in sent alert $Job->clearjobsdone unless $Job->error ; $Job->start_ms and $Job->nextstep("Cleaning environment"); # Preparing to convert another file immediatly, not waiting the # previous job to be cleaned $do_file ++ if $CLEAN_OPTIMIZATION ; } else { $Job->a2pjob_progress ; } &EnqueueJob($Jid); } ############################################################################ # Step 11: Environment is clean, next is to purge ############################################################################ } elsif ( $Job->isstep(11)) { # Check to send a move command if ($Job->do_move_cmd) { my $cmd = $Job->move_files_cmd || "" ; # Send now any non empty move command if not still done if ($cmd){ $self->Request( BackEnd => $cmd ); &UPSTAT('MOVE_CMDS'); } else { &EnqueueJob($Jid); &UPSTAT('BAD_MOVE_CMDS'); } } elsif ( $Job->do_clean_cmd ) { my $cmd = $Job->clean_cmd || "" ; # Send now any clean command if not still done if ($cmd) { $self->Request( BackEnd => $cmd ); &UPSTAT('CLEAN_CMDS'); } else { &EnqueueJob($Jid); &UPSTAT('BAD_CLEAN_CMDS'); } } else { # Cleaning is finished $Job->stop_ms; # Stop chrono if ( $Job->getstate != DONE ) { $Job->error( $Job->getstate, "ABTERM: Error while cleaning"); $Job->cannotpurge ; } else { $Job->canpurge ; $Job->stepdone ; } $Job->start_ms and $Job->nextstep("Purging environment"); return &EnqueueJob($Jid); } # Check to return progression for long job $Job->clean_progress ; ############################################################################ # Step 12: Try to purge environment and delete job ############################################################################ } elsif ( $Job->isstep(12) ) { if ($Job->purge or $Job->getpurgetry == 0) { $Job->error(112, "ABTERM: Error while purging" . (($!)?" ($!)":"")) unless $Job->purge ; $Job->stop_ms; # Stop chrono $Job->setstate( DONE ); # Keep statistics from this job before delete it $self->KeepStats( $Job->getstatistics() ); # Anyway step is done $Job->stepdone ; if ( $Job->error ) { # Check to do alert $Job->jobalert ; # Now we can release memory $Job->clearjobsdone ; &UPSTAT('ABTERM'); } else { &UPSTAT('RELEASEDAFPJOB'); } $self->ReturnInfoToClient( $Jid ); &delete_job($Jid); # Manage case used in tests & diagnostics to stop service after # a JOB_COUNTDOWN has been reached &UPSTAT('JOBDONE'); if ($JOBS_COUNTDOWN) { if ($STATS{'JOBDONE'} >= $JOBS_COUNTDOWN) { &Info("Countdown of $JOBS_COUNTDOWN jobs reached"); # Ask maintid to stop kill 15, $maintid ; } else { my $ratio = int(($STATS{'JOBDONE'}*100) / $JOBS_COUNTDOWN) ; unless ( ! $ratio or exists($STATS{'RATIO-'.$ratio}) or $ratio % 10) { $STATS{'RATIO-'.$ratio} = localtime(time) ; &Info("Process of $JOBS_COUNTDOWN jobs countdown: ". sprintf('%02d%%',$ratio)); } } } return $Job->error ? 0 : 1 ; } else { # This can occur when zip file command is not finished and some # files to clean are still in a folder to purge $Job->jobdebug("Environment still not purged for Job $Job"); # Trying to purge later } &EnqueueJob($Jid); ############################################################################ # Bad job ############################################################################ } else { $Job->error(5,"ABTERM: Don't know what to do"); # Check to do alert $Job->jobalert ; $self->ReturnInfoToClient( $Jid ); &delete_job($Jid); &UPSTAT('ABTERM'); return 0 ; } return 1 ; } sub ThreadInit { my $self = shift ; # Initializes stat headers $self->{STATHEADERS} = [ qw( Date DayTime Jobname DestId SubStatus Status Errors A2P-Chrono LockId Job-Timing ) ]; $self->{HAVE_STATS} = 1 ; # Other initialization $self->{PENDING_REQUEST} = $self->{MANAGEJOBS_COUNT} = 0 ; $self->{REQUESTS_ARRAY} = [] ; $self->{PENDING_LIMIT} = 10 ; $self->{PENDING_TIMING} = $self->{MANAGEJOBS_TIME} = [ &gettimeofday() ]; } sub InitUpdated { my $self = shift; # Garanty pending limit is greater than zero and not excessive $self->{PENDING_LIMIT} = $MAXJOBS + ( $MAXJOBS ? $BACKEND_FACTOR * $MAXJOBS : 10 ); } sub Request { # Modified Request to limit the rate of request relative to the number # of Backends my $self = shift ; &MAXSTAT('pending-REQUEST',$self->{PENDING_REQUEST}); return unless ( @_ or @{$self->{REQUESTS_ARRAY}} ); if ( @_ ) { &Debug("Caching '@_'") if ( ! $NO_SYSLOG_DEBUG and @{$self->{REQUESTS_ARRAY}} ); push @{$self->{REQUESTS_ARRAY}}, \@_ ; # Update current pending requests number $self->{PENDING_REQUEST} ++ unless ( $_[0] eq 'Listener' ); } while ( $self->{PENDING_REQUEST} < $self->{PENDING_LIMIT} or rand $self->{PENDING_LIMIT} < &tv_interval($self->{PENDING_TIMING})) { # The rand test allow sending new request even when there is none # This guaranty a request will be send sometime by just calling this sub # So we can call it regularily on a time basis to not forget any request my $ref = shift @{$self->{REQUESTS_ARRAY}} ; last unless (defined($ref)); next unless @{$ref} ; my $req = &GetCom( comREQ , @{$ref} ); &Debug("Requesting '$req'"); $self->Answer( $req ); &UPSTAT('ASKED-REQUESTS'); $self->{PENDING_TIMING} = [ &gettimeofday() ]; $STATS{'LAST-PENDING-REQUEST'} = "@{$ref}" ; # Keep state we have some new tasks for other jobs to do in the next # loops as we have been able to transmit pending request $self->{DONTSLEEP} ++ ; } &MAXSTAT('cached-REQUEST',scalar(@{$self->{REQUESTS_ARRAY}})); } sub DoBeforeQuit { my $self = shift ; return 1 unless ( $SCAN_SPOOL and $KEEP_JOBSTATUS ); my $alerted = "" ; &Debug("Still known jobs: " . join(' ',keys(%Jobs))) if (keys(%Jobs)); # Get JIDs in SlowJobs list push @Jobs, map { shift @{$_} } @SlowJobs if @SlowJobs ; &Debug("Was about to process jobs: @Jobs") if @Jobs ; &Debug("Still active jobs: " . join(' ',&ActiveJobCount())) if (&ActiveJobCount()); # Set status to aborted to any existing job not in ABTERM foreach my $Jid ( keys(%Jobs) ) { unless (&is_job($Jid)) { &delete_job($Jid); &Warn("Forgetting status of deleted $Jid job"); next ; } my $Job = $Jobs{$Jid} ; # Only steps saying we are managing an AFP file are significant if ( $Job->isstep(0) or ($Job->isstep(1) and $Job->getstate == NOMOREFILE )) { &Debug("Job $Jid can't involve a conversion"); &delete_job($Jid); next ; } my $alert = "Job $Jid was aborted " ; if ($Job->isstep(1)) { if ( $Job->getanswer and -e $Job->getanswer ) { $alert .= "with a file to convert" ; } else { $alert .= "without knowing if a file is to be converted" ; } } elsif ($Job->isstep(2)) { $alert .= "during TeX conversion" ; } elsif ($Job->isstep(3)) { $alert .= "during PDF conversion" ; } elsif ($Job->isstep(4)) { $alert .= "during DVI conversion" ; } elsif ($Job->isstep(5) or $Job->isstep(6)) { $alert .= "during PCL conversion" ; } elsif ($Job->isstep(7)) { $alert .= "during PS conversion" ; } elsif ($Job->isstep(8) or $Job->isstep(9)) { $alert .= "during Printing step" ; } elsif ($Job->isstep(10) and $Job->geta2pjob()) { $alert .= "during Archiving step" ; } elsif ($Job->isstep(10)) { $alert .= "waiting after sub jobs" ; } elsif ($Job->isstep(11)) { $alert .= "during cleaning step" ; } elsif ($Job->isstep(12)) { $alert .= "during purging step" ; } elsif ($Job->error) { # If we came here, an alert should still has been sent for this job &Warn("Job $Jid in error aborted"); &delete_job($Jid); next ; } else { $alert .= "in an unknown state" ; } $Job->jobstatus( 'A', { STATUS => 'KO', INFOS => 'Aborted' } ) or &Info("Can't update aborted '$Jid' status"); $Job->cleanjob ; $JOB_DUMP_ON_ERROR = 1 ; # Force the dump of the job object $Job->jobalert( $alert ); $alerted .= ($alerted?" ":"") . $Jid ; &delete_job($Jid); } &Debug($alerted?"Alerts sent for $alerted jobs":"No alert sent"); } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;