# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: AfpJob.pm 3 2007-10-18 16:20:19Z guillaume $ # # Class to manage an AFP Job # package A2P::AfpJob ; use strict; use Time::HiRes qw(gettimeofday tv_interval) ; use Data::Dumper ; use A2P::Globals ; use A2P::Syslog ; use A2P::Tools qw( ShortID ); use A2P::JobStatus qw( a2pjobstate get_status ) ; use A2P::Status ; use A2P::DB qw( job_queue_update get_next_afpjob is_job_status_updated a2p_db_connect a2p_db_connected get_serviceid_info a2p_db_avoid_disconnect_in_forked ); use A2P::Com qw( GetCom comJOB ); use A2P::Client ; BEGIN { our $VERSION = sprintf "%s", q$Rev: 1188 $ =~ /(\d[0-9.]+)\s+/ ; } our $VERSION ; my $Infos ; my $event ; # Private used to manage event our ( $notdone , $busy ) ; # Don't indent for freeze API $Data::Dumper::Indent = 0 ; $Data::Dumper::Purity = 1 ; $Data::Dumper::Terse = 1 ; # Private constants sub _chained_request_qr { qr/^[a-z:]+$/ } sub new { my $class = shift ; my $mode = shift || 0 ; my $self = { MODE => $mode, event => 0 } ; bless $self , $class ; #&Debug("new ".__PACKAGE__." object v$VERSION mode $mode created"); return $self ; } # Additionnal authorized actions by mode my $actions_by_mode = [ # User mode [ 'cancel', #'pause', #'resume', #'ticket', 'update' ], # Advanced mode [ ] ]; sub actions { # Return a list of possible actions by mode for this job # The list is just an API binding list that can used, the caller should # known how to use them with A2P::AfpJob objects # start, validate & delete actions are implicit my $self = shift ; return @{$actions_by_mode->[$self->mode]} ; } sub afpspool { # Set/return the AFPSPOOL for this job my $self = shift ; $self->{AFPSPOOL} = shift if (@_ and $_[0]); return exists($self->{AFPSPOOL}) ? $self->{AFPSPOOL} : $AFPSPOOL ; } sub bad_request { # Set/return the BAD REQUEST status my $self = shift ; $self->{BAD_REQUEST} = shift if @_ ; return defined($self->{BAD_REQUEST}) ? $self->{BAD_REQUEST} : 0 ; } sub cancel { # Cancel this job my $self = shift ; # Check if it's an event request return $event > 0 ? 1 : 0 if ($event = $self->is_event_request($_[0],'cancel')); # Initialization my $name = $self->name ; # Check we are knowing the service my $service = $self->service ; unless ($service) { $self->error("Service for this job is not known"); return 0 ; } &Info("Cancelling A2P::AfpJob $name on service '$service'"); $self->info("Cancelling"); # Fork before my $forked = fork ; if (defined($forked)) { if ($forked) { &Info("Cancelling $name job..."); &Debug("Forked process to cancel $name" . ( $self->service ? " on " . $self->service : "") ) if ($ADVANCED_DEBUGGING); return 1 ; } else { &Debug("Cancelling $name job "); # Set environment as in service $self->fake_service ; # Don't close DB connection when my job is done &a2p_db_avoid_disconnect_in_forked(); # Get status object my $status = $self->get_status_object ; unless ($status) { &Error("Can't get Status object of $name"); exit ; } # Initialize jobstatus $self->report("Cancelling"); &a2pjobstate( $status->{AFP} , 12 , 'o', { JOBID => $name, STATUS => 'CANCELLING', INFOS => 'Cancelling' } ) or &Info("Can't set '$name' job status"); ( $notdone , $busy ) = ( 1, 0 ) ; my $resync_timer = [ &gettimeofday() ] ; # Try to cancel the job while ( $notdone > 0 and $notdone <= 10 ) { my @ret = &TellListener( $service, JobManager => &GetCom( comJOB , $name => 'cancel' ), sub { while (@_) { my $info = shift ; &Debug("Cancel $name: $info") if ($ADVANCED_DEBUGGING); $self->report($info); $notdone = 0 if ( $info =~ /^CANCEL.*OK$/i ); $notdone = -1 if ( $info =~ /ANSWERED REQUEST TIME OUT/ or $info =~ /CANCEL.*KO/ ); $busy++ if ($info =~ /^ERROR.*MAX JOBS COUNT REACHED$/); } # Resync object in DB after a while if ( &tv_interval($resync_timer) > 5 ) { $resync_timer = [ &gettimeofday() ] ; $self->sync_with_jobs_queue('on_cancel') ; } }); &Debug("Cancel $name on $service returned '@ret'") if ( $ADVANCED_DEBUGGING and @ret ); if ( $notdone > 0 ) { last unless ( $notdone < 10 or $busy ); &Warn("Job $name cancel not done, retrying soon"); sleep 2 ; &Info("Cancelling again $name job"); $notdone ++ unless $busy ; $busy = 0 ; } } if ($notdone) { $self->error("Job $name seems not cancelled"); &Warn("Job $name seems not cancelled"); } else { &Info("Job $name cancelled"); &a2pjobstate( $status->{AFP} , 12 , 'o', { JOBID => $name, INFOS => 'Cancelled' } ) or &Info("Can't set info on '$name' job status"); } # Reset event and resync object in DB before quitting $self->unset_event ; exit $notdone ; } } else { &Warn("Can't fork to cancel $name job: $!"); } return 1 ; } sub date { # Set/return a date information for this jobs as seconds since EPOCH my $self = shift ; $self->{DATE} = shift if (@_ and $_[0]); return exists($self->{DATE}) ? $self->{DATE} : time ; } sub delete { # Just delete the job from the queue my $self = shift ; my ( $name, $jid ) = ( $self->name, $self->{jid} ); &Debug("Deletion of $name (row $jid)"); # We set the revision to a negative value and update the object return &job_queue_update( $jid, 0, 0, $name, '', -1 ) ; } sub details { # TODO implement # Return any useful informations for advanced mode my $self = shift ; return $self->infos ; } sub enqueue { # Enqueue the A2P::AfpJob in DB table my $self = shift ; my $errs = $self->error ; # Get the current errors count # 1. Set the mode if specified, or set it to user mode $self->mode(@_ ? $_[0] : 0 ); # 2. Force to synchronize us in jobs_queue # TODO Better check if the status is still in diagnostics table $self->sync_with_jobs_queue(1); # Return true if no new error has been set return $self->error == $errs ? 1 : 0 ; } sub enqueue_status { # Enqueue the A2P::Status for this job in DB table my $self = shift ; my $status = shift ; # A chained request must be composed of lower case and char ':' as separator my $chained = shift || '' ; $self->{STATUS_OBJECT} = $status ; $self->name($status->is_job); $self->jobname($status->{AFP}); &Info("Enqueuing ".$status->is_job." in jobs_queue table..."); # Enqueue in diagnostics table return &Error("AfpJob ".$self->name." not enqueued") unless ($self->enqueue); # Just ask update if not doing chained request return $self->update unless ( $chained =~ _chained_request_qr ); # Just ask for the event chained request &Info("Requesting '$chained' events on ".$self->name); return $event > 0 ? 1 : 0 if ( $event = $self->is_event_request( $self->lockid, $chained ) ); } sub error { # Add an error message, return an array of errors line # The error status will be true if any error message is set my $self = shift ; push @{$self->{ERRORS}}, @_ if (@_); return exists($self->{ERRORS}) ? @{$self->{ERRORS}} : () ; } sub fake_service { # Update environment to fake a service my $self = shift ; $LOCKID = $self->lockid ; $SCAN_SPOOL = 1 ; $KEEP_JOBSTATUS = 1 ; # Also consolidate db connection $RECONNECT_TIMER = 0 ; &a2p_db_connect ; } sub file { # Set/return path to AFP file my $self = shift ; my $file = $self->{FILE} || '' ; # Get the file from status if still not available or set unless ($file) { my $status = $self->get_status_object ; $file = $status->{FILE} if (exists($status->{FILE})); } # We need to set the full path to file my $afpspool = $self->afpspool ; if ( $file and $file !~ m|^/| and $afpspool ) { &Debug("Trying file '$file' in '$afpspool' as spool") if ($ADVANCED_DEBUGGING); $file = $afpspool . '/' . $file ; } else { my $jobname = $self->jobname ; my $folder = $afpspool . '/' . $self->lockid ; &Debug("Searching jobname '$jobname' in '$folder' folder") if ($ADVANCED_DEBUGGING); my $glob = $folder . '/*' . $jobname ; my @list = glob($glob); if (@list) { &Debug("Trying ".(@list>1?"first of ":"")."found: @list") if ($ADVANCED_DEBUGGING); $file = shift @list ; } else { $file = '' ; $self->error("No jobname '$jobname' found in '$folder' folder"); } } # Set error if job is not found if ( $file and ! -d $file and -e $file ) { &Debug("Found file is '$file' for ".$self->name) if ($ADVANCED_DEBUGGING); } else { delete $self->{SIZE} ; $self->error("File '$file' not found"); $file = '' ; } # Set also file size $self->{SIZE} = -s $file if $file ; return $self->{FILE} = $file ; } sub freeze { my $self = shift ; return Dumper($self) ; } sub get_next_from_db { my $self = shift ; my $service = shift ; # If provided, we are looking for event in jobs_queue # Get the next row as a hash with column names as keys my $row = &get_next_afpjob( $self->mode, $service ) or return 0 ; return 0 unless (defined($row) and ref($row) =~ /^HASH/); my $newself = &thaw($row->{THAWN}) ; return 0 unless ( ref($newself) =~ /^A2P::AfpJob/ ); # Update info which can be not synchronized at some time $newself->{jid} = $row->{JID} ; $newself->{DBTIME} = $row->{DBTIME} ; $newself->mode($self->mode); $newself->sync_with_jobs_status ; return $newself ; } sub _get_serviceid_of { unless (&a2p_db_connected() and $Infos) { &Debug("Initializing A2P::Infos object"); $Infos = &get_serviceid_info('check all'); return 0 unless $Infos ; } return $Infos->get_serviceid_of(@_); } sub get_size { # return the AFP file size my $self = shift ; my $size = 0 ; if (exists($self->{SIZE})) { $size = $self->{SIZE} ; } else { my $file = $self->file ; if ($file) { # Stange, FILE is set but not SIZE... try again to get size $! = 0 ; $self->{SIZE} = $size = -s $file ; $self->error("Can't get size of '$file': $!") unless (defined($size)); } else { $self->error("Can't know size of not found file"); } } return defined($size) ? $size : 0 ; } sub get_status_object { # Return a status object my $self = shift ; return $self->{STATUS_OBJECT} if (exists($self->{STATUS_OBJECT})); return $self->{STATUS_OBJECT} = new A2P::Status( {}, {} ) ; } sub handle_event { # Check the current event and do the requested job my $self = shift ; return 0 unless ($self->{event} and $self->{EVENT}); # A chained request can be composed of lower case and char ':' as separator my @events = grep { /^[a-z]+$/ } split( /:/, $self->{EVENT} ); # Remark: A chained events string should represent different event so the # later test must match # Check EVENT is really authorized my @possible_actions = ( 'start' , 'validate', 'delete', $self->actions ); unless ( @events == grep { $self->{EVENT} =~ /$_/ } @possible_actions ) { $self->error("Unauthorized to do '$self->{EVENT}'"); return 0 ; } # Really do the action my ( $ret , $deleted ) = ( 0 , 0 ); foreach my $todo (@events) { &Debug("Doing evaluation of self->$todo on ".$self->name) if ($ADVANCED_DEBUGGING); unless ( $ret = eval '$self->' . $todo ) { &Warn("Bad $todo event handling on ".$self->name); last ; } $deleted ++ if ( $todo eq 'delete' ); } if ($ret) { &Debug("Done '".$self->{EVENT}."' on ".$self->name) if ($ADVANCED_DEBUGGING); } else { &Error("Can't do '".$self->{EVENT}."' on ".$self->name); map { &Error($_) } $self->error ; } # Anyway release events $self->unset_event ; return $ret ? ( $deleted ? -1 : 1 ) : 0 ; } sub info { # Set info on the status object, can only be used in a2p-status service my $self = shift ; my $info = shift or return 0 ; my $status = $self->get_status_object ; $status->{INFOS} = $info ; &Debug("Information for ".$self->name." set to '$info'") if ($ADVANCED_DEBUGGING); # Set status is dirty and guaranty it will be push to status DB table $status->set_dirty_bit ; $status->set_dirty_bit(3) ; } my %afpjob_infos = ( DBTIME => 'Time update in DB', REV => 'diag_rev', NAME => 'jobname', JOBNAME => 'afpname', JOBID => 'jobname', STATUS => 'status', ERRORS => 'errors', FILE => 'file', SIZE => 'file_size', WARNINGS => 'warn', DESTIDS => 'destid', jid => 'db_jid', EVENT => 'event', REPORTS => 'report', SERVICE => 'a2p_service', AFPSPOOL => 'a2p_afpspool', ZIPFILE => 'a2p_zipfile' ); my %status_infos = ( STATE => 'states', REV => 'rev', LOCKID => 'lockid', STEP => 'step', SID => 'status_row', JOBID => 'jobname', STATUS => 'status', ABTERM => 'abterm', NBJOBS => 'nbjobs', AFP => 'afpname', INFOS => 'infos', DAY => 'day', JOBS => 'jobs', RESTART => 'restart' ); sub infos { # Return any useful informations for at least user mode my $self = shift ; my $status = $self->get_status_object ; # Update zipfile member $self->search_zipfile ; # Get infos from A2P::AfpJob my %infos = map { $afpjob_infos{$_} => ref($self->{$_}) =~ /^array/i ? join("\n", grep { length($_) } @{$self->{$_}}) : $self->{$_} } keys(%afpjob_infos) ; map { $infos{$status_infos{$_}} = ref($status->{$_}) =~ /^array/i ? join("\n", grep { length($_) } @{$status->{$_}}) : $status->{$_} } keys(%status_infos) if ($self != $status); # Adapt special case if (exists($infos{states})) { $infos{states} = join("",split(/\n/,$infos{states})) ; } # Delete any empty value map { delete $infos{$_} } grep { ! length($infos{$_}) } keys(%infos); return \%infos ; } sub is_abterm { # Return true if the status is abterm my $self = shift ; return 1 if ($self->bad_request); my $status = $self->get_status_object || 0 ; return $status ? $status->is_abterm : 0 ; } sub is_done { # Return true if the status is done my $self = shift ; my $status = $self->get_status_object || 0 ; return $status ? $status->is_done : 0 ; } sub is_event_request { # Return boolean if it's an event request, the boolean is negative if it's # a faulty event request my $self = shift ; return 0 if ($self->{event}); # Check if lockid is provided my $lockid = shift || $self->lockid ; # A chained request must be composed of lower case and char ':' as separator my $request = shift || 'update' ; &Debug("Initiating $request on ".$self->name) if ($ADVANCED_DEBUGGING); return $self->set_event_on( $request, $lockid ) ? 1 : -1 ; } sub jobname { # Set/return the jobname my $self = shift ; $self->{JOBNAME} = shift if @_ ; return exists($self->{JOBNAME}) ? $self->{JOBNAME} : '' ; } sub lockid { # Set/return the lockid my $self = shift ; my $status = $self->get_status_object || $self ; $status->{LOCKID} = shift if @_ ; return exists($status->{LOCKID}) ? $status->{LOCKID} : '' ; } sub md5 { # Set/return the md5 to AFP file my $self = shift ; unless (exists($self->{MD5})) { my $file = $self->file ; my $md5 = 0 ; if ($file) { my $md5cmd = qx/md5sum "$file"/ ; my ( $result , $control ) = $md5cmd =~ /^(\w+)\s+(.+)$/ ; if ( $control eq $file ) { $md5 = $result ; } else { $self->warning("Can't retrieve md5 of '$file' file"); } } else { $self->warning("Can't compute md5 of not found file"); } $self->{MD5} = $md5 ; } return exists($self->{MD5}) ? $self->{MD5} : 0 ; } sub mode { # Return/set the mode my $self = shift ; if (@_) { my $mode = shift || 0 ; # Mode can only be 0: user mode or 1: advanced mode $self->{MODE} = ($mode =~ /^\d+$/ and $mode)? 1 : 0 ; } return exists($self->{MODE}) ? $self->{MODE} : 0 ; } sub name { # Return/set a name to identify this AFP job my $self = shift ; if (@_) { my $name = shift ; # Set a default name if called with a false value $name = sprintf( "A2P-AFPJOB-%s", &ShortID ) unless (defined($name) and $name); $self->{NAME} = $name ; } return $self->{NAME} = (defined($self->{NAME}) and $self->{NAME}) ? $self->{NAME} : ( $self->{JOBID} ? $self->{JOBID} : ( $self->{JOBNAME} ? $self->{JOBNAME} : "" )); } sub nbjobs { # Set/return the number of sub-jobs from status object my $self = shift ; my $status = $self->get_status_object || $self ; $status->{NBJOBS} = shift if @_ ; return exists($status->{NBJOBS}) ? $status->{NBJOBS} : 0 ; } sub open_ticket { # TODO implement # Open a ticket for this job my $self = shift ; &Debug("Opening ticket still not supported for ".$self->name); return 0 ; } sub otherspools { # Set/return the DONESPOOL & ERRORSPOOL for this job my $self = shift ; $self->{DONESPOOL} = shift if (@_ and $_[0]); $self->{ERRORSPOOL} = shift if (@_ and $_[0]); return map { exists($self->{$_}) ? $self->{$_} : '' } qw( DONESPOOL ERRORSPOOL ) ; } sub pause { # TODO implement # Pause this job my $self = shift ; &Debug("Pause still not supported for ".$self->name); return 0 ; } sub report { # Add reporting message, return an array of reporting lines my $self = shift ; push @{$self->{REPORTS}}, @_ if (@_); return exists($self->{REPORTS}) ? @{$self->{REPORTS}} : () ; } sub resume { # TODO implement # Resume this paused job my $self = shift ; &Debug("Resume still not supported for ".$self->name); return 0 ; } sub search_zipfile { my $self = shift ; return &Debug("DONESPOOL not set") unless (defined($self->{DONESPOOL})); return &Debug("ERRORSPOOL not set") unless (defined($self->{ERRORSPOOL})); my $base = $self->lockid . '-' . $self->name ; my @files = glob( $self->{DONESPOOL} . '/' . $base . '*.zip' ); push @files, glob( $self->{ERRORSPOOL} . '/' . $base . '*.zip' ); return $self->{ZIPFILE} = \@files ; } sub service { # Set/return the service for the status object my $self = shift ; $self->{SERVICE} = shift if @_ ; return defined($self->{SERVICE}) ? $self->{SERVICE} : '' ; } sub set_event_on { # Set the event flag on this job in the jobs_queue table my $self = shift ; $event = shift ; # Event is related to a lockid my $lockid = shift || $self->lockid ; unless ($lockid) { $self->error("Can't set serviceid when lockid is unknown"); return 0 ; } # Get the serviceid for that LOCKID my $serviceid = &_get_serviceid_of($lockid); unless ($serviceid) { $self->error("Can't found serviceid of $lockid lockid"); return 0 ; } &Debug("Got serviceid $serviceid for lockid $lockid"); my ( $name, $jid ) = ( $self->name, $self->{jid} ); &Debug("Setting event $serviceid on $name (row $jid)"); # Set event flag with serviceid and event callback $self->{event} = $serviceid ; $self->{EVENT} = $event ; # Synchronize with table return $self->sync_with_jobs_queue('event') ; } sub set_status_object { # Set the status object my $self = shift ; my $status = shift ; return 0 unless (ref($status) =~ /^A2P::Status/); return $self->{STATUS_OBJECT} = $status ; } sub start { # Start this job with 2 cases: # 1. Called from Webmin -> we want it to be called by the right a2p-status # 2. Called by a2p-status after it detects it has to start a job my $self = shift ; my $name = $self->name ; unless ($self->{event}) { # Check if lockid is provided my $lockid = shift || $self->lockid ; &Debug("Initiating start on $name") if ($ADVANCED_DEBUGGING); return $self->set_event_on( 'start', $lockid ); } # Really start now my $file = $self->file ; # Check file definition unless ( $file and $file !~ m|/$| ) { $self->error("File for this job is not known"); return 0 ; } # Check file existence # TODO Add check file is good AFP file unless ( ! -d $file and -e $file ) { $self->error("File '$file' not available"); return 0 ; } # Check we are knowing the service my $service = $self->service ; unless ($service) { $self->error("Service for this job is not known"); return 0 ; } &Info("Restarting A2P::AfpJob $name with file '$file'"); $self->info("Restarting"); # To start we would to call directly the service, but fork before my $forked = fork ; if (defined($forked)) { if ($forked) { &Debug("Forked process to restart $name on $service") if ($ADVANCED_DEBUGGING); return 1 ; } else { # Set environment as in service $self->fake_service ; # Don't close DB connection when my job is done &a2p_db_avoid_disconnect_in_forked(); # Get status object my $status = $self->get_status_object ; unless ($status) { &Error("Can't get Status object of $name"); exit ; } my $report = $self->{EVENT} =~ /delete/ ? 0 : 1 ; $self->{event} = 0 if $report ; # Initialize jobstatus with AfpName to keep restarts in mind $self->report("Restarting") if ($report); &a2pjobstate( $status->{AFP} , 1 , 'o', { AFP => $status->{AFP}, JOBID => $name, STATUS => 'STARTING', FILE => $file, INFOS => 'Restarting' } ) or &Info("Can't initialize '$name' job status"); ( $notdone , $busy ) = ( 1, 0 ) ; my $resync_timer = [ &gettimeofday() ] ; my $ABTERM = "" ; # Try to restart the job while ( $notdone > 0 and $notdone <= 10 ) { my @ret = &TellListener( $service, JobManager => &GetCom( comJOB , $name => $file ), sub { while (@_) { my $info = shift ; &Debug("Running $name: $info") if ($ADVANCED_DEBUGGING); $self->report($info) if ($report); $notdone = 0 if ( $info =~ /^JOBSTATUS.*OK$/i ); $notdone = -1 if ( $info =~ /ANSWERED REQUEST TIME OUT/ or $info =~ /JOBSTATUS.*KO/ ); $busy++ if ($info =~ /^ERROR.*MAX JOBS COUNT REACHED$/); $ABTERM = $1 if ( $info =~ /ABTERM:\s*(.*)$/i ); } # Resync object in DB after a while if ( &tv_interval($resync_timer) > 5 ) { $resync_timer = [ &gettimeofday() ] ; $self->sync_with_jobs_queue('on_restart') ; } }); &Debug("Start $name on $service returned '@ret'") if ( $ADVANCED_DEBUGGING and @ret ); if ( $notdone > 0 ) { last unless ( $notdone < 10 or $busy ); &Warn("Job $name not done, retrying soon"); sleep 2 ; &Info("Retrying restart of $name job"); $notdone ++ unless $busy ; $busy = 0 ; } } if ($notdone) { $self->error("Job $name not restarted"); &Error("Job $name not restarted"); &a2pjobstate( $status->{AFP} , 12 , 'o', { AFP => $status->{AFP}, JOBID => $name, STATUS => 'KO', INFOS => $ABTERM || 'Restart failed' } ) or &Info("Can't set failed info on '$name' job status"); } else { &a2pjobstate( $status->{AFP} , 12 , 'o', { AFP => $status->{AFP}, JOBID => $name, INFOS => 'Restart done' } ) or &Info("Can't set info on '$name' job status"); } # Reset event and resync object in DB before quitting $self->unset_event if ($report); exit $notdone ; } } $self->error("Can't fork to ask $service service to restart $name"); return &Error("Can't fork to ask $service service to restart $name"); } sub status { # Set/return the status from status object my $self = shift ; return 'BAD REQUEST' if ($self->bad_request); my $status = $self->get_status_object || $self ; $status->{STATUS} = shift if @_ ; return defined($status->{STATUS}) ? $status->{STATUS} : '' ; } sub step { # Set/return the step from status object my $self = shift ; my $status = $self->get_status_object || $self ; $status->{STEP} = shift if @_ ; return defined($status->{STEP}) ? $status->{STEP} : 0 ; } sub _row_index { my $self = shift ; return ( $self->{jid}, $self->{MODE}, $self->{event}, $self->name, $self->jobname, $self->{REV} ) ; } sub sync_with_jobs_queue { # sync ourself to jobs_queue table in DB my $self = shift ; if ( ! ( defined($self->{jid}) and $self->{jid} ) or ( defined($_[0]) and $_[0] ) ) { # Called with arg will push this object as new version $self->{REV} = exists($self->{REV}) ? $self->{REV}+1 : 0 ; # Call DB API with expected row my $jid = &job_queue_update( $self->_row_index, $self->freeze ); # Set jid when it was unknown and it is returned, and update us in DB... if (!$self->{jid} and $jid) { $self->{REV} ++ ; $self->{jid} = $jid ; $jid = &job_queue_update( $self->_row_index, $self->freeze ); } $self->error("Can't update $self->name in DB with JID $jid") unless ($jid and $self->{jid} == $jid); } else { # Get the one from DB and return it if more recent # The API should return a storable newer object or undef my $thawn = &job_queue_update( $self->_row_index ); my $newself = &thaw($thawn) ; if (ref($newself) =~ /^A2P::AfpJob/) { # Only replace us to newer version if really more recent $self = $newself if ($newself->{REV} >= $self->{REV}); } else { $self->error("Bad object retrieved from DB (".ref($thawn). ";".ref($newself).")"); } } # Return self as synchronized object return $self ; } sub sync_with_jobs_status { # sync ourself to jobs_status table in DB my $self = shift ; my $status = $self->get_status_object or return &Debug("No status object for ".$self->name) ; return &Debug("No status SID for ".$self->name) unless ($status->get_sid_index); # Reset checked in db timer $status->checked_in_db_timer(0); $status->check_update_in_db ; # Return self as synchronized object return $self ; } sub thaw { my $freezed = shift ; my $object = eval $freezed ; return $object ; } sub unset_event { # Set the event flag on this job in the jobs_queue table my $self = shift ; my ( $name, $jid ) = ( $self->name, $self->{jid} ); &Debug("Resetting event on $name (row $jid)"); # Set event flag with serviceid $self->{event} = 0 ; delete $self->{EVENT} ; # Synchronize with table return $self->sync_with_jobs_queue('reset_event') ; } sub update { # Really do nothing, but this is a convenient way to update as action my $self = shift ; # Check if it's an event request return $event > 0 ? 1 : 0 if ($event = $self->is_event_request($_[0],'update')); &Debug("Updating informations for ".$self->name); return 1 ; } sub validate { my $self = shift ; # Check if it's an event request return $event > 0 ? 1 : 0 if ($event = $self->is_event_request($_[0],'validate')); my $name = $self->name ; # Fork before doing validation my $forked = fork ; if (defined($forked)) { if ($forked) { &Info("Validating $name job..."); &Debug("Forked process to validate $name" . ( $self->service ? " on " . $self->service : "") ) if ($ADVANCED_DEBUGGING); return 1 ; } else { # Set environment as in service $self->fake_service ; # Don't close DB connection when my job is done &a2p_db_avoid_disconnect_in_forked(); my $status = $self->get_status_object ; unless ($status) { &Error("Can't get Status object of $name"); exit ; } # Initialise status as new status &a2pjobstate( $status->{AFP} , 1 , 'V', { AFP => $status->{AFP}, JOBID => $name, STATUS => 'STARTING', INFOS => 'Validating' } ) or &Info("Can't initialize '$name' job status validation"); foreach my $step (2..12) { my $state = $status->step_status($step); next unless ($state eq 'A'); &Debug("Validating $name step $step..."); &a2pjobstate( $status->{AFP} , $step , 'o', { JOBID => $name, STATUS => 'DONE', INFOS => 'Validating' } ) or &Warn("Can't validate '$name' job status at step $step"); } # Validate jobstatus if ( &a2pjobstate( $status->{AFP} , 12 , 'o', { JOBID => $name, STATUS => 'DONE', INFOS => 'Validated' } ) ) { &Info("Validated $name job"); } else { &Error("Can't validate '$name' job status"); } # Finally quit this forked process exit ; } } else { &Warn("Can't fork to validate $name status: $!"); } return 1 ; } sub warning { # Add warning message, return an array of warning lines my $self = shift ; push @{$self->{WARNINGS}}, @_ if (@_); return exists($self->{WARNINGS}) ? @{$self->{WARNINGS}} : () ; } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;