# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: Status.pm 3 2007-10-18 16:20:19Z guillaume $ # # Class to implement one job status object handling its tied version # package A2P::Status ; use strict; use Storable qw( freeze thaw ); use Time::HiRes qw( usleep ); use A2P::Globals ; use A2P::Syslog ; use A2P::Tools qw( SharedList FreeSharedList ms ); use A2P::DB qw( job_status_update is_job_status_updated get_next_jobstatus job_status_delete ); BEGIN { our $VERSION = sprintf "%s", q$Rev: 1158 $ =~ /(\d[0-9.]+)\s+/ ; } our $VERSION ; # Set initial status constant to default array at compilation sub INTIAL_STATUS { split(//,'o__00000000__') } my $CACHE = {} ; # Status object cache my $TIEHASH = {} ; # Tied hash my $DB = {} ; # Status DB informations my $SID_INDEX = 0 ; # Index to initialize status unique ID, only in a2p-status my %MESG = () ; # Used in StatusDebug to avoid same messages ### Private members related to SID management ### # Reference of sid cache list sub SID_CACHE { defined($CACHE->{__SID__}) ? $CACHE->{__SID__} : ( $CACHE->{__SID__} = {} ) } # Only call _get_sid_tied_key when we are sure first arg is a number sub _get_sid_tied_key { '_SID_' . (@_ ? $_[0] : "" ) } my $MATCH_SID_TIED_KEY = qr/^_SID_(\d+)$/ ; # Revision keys extension sub _rev_ext { '_rev' } sub _rev_ext_re { qr/_rev$/ } ################################################# sub _is_tie_writer { # Status object must not be used before this key is set in cache return ( tied(%$TIEHASH) and defined($CACHE->{__GDBM_WRITER__}) and $CACHE->{__GDBM_WRITER__} ); } sub new { my $class = shift ; my $self = undef ; # Return if CACHE and TIEHASH are not hash return $self unless (ref($_[0]) =~ /^HASH/ and ref($_[1]) =~ /^HASH/); &TIMESTAT('STATUS-GET-NEW'); $CACHE = shift ; $TIEHASH = shift ; my $job = shift || 'noname' ; # Try to get the status from CACHE if (defined(SID_CACHE->{$job})) { $self = SID_CACHE->{$job}->check_cached_aged ; } elsif (defined($CACHE->{$job})) { $self = $CACHE->{$job}->check_cached_aged ; } unless (defined($self)) { # Job not in cache, prepare to check tied hash by constructing a # minimal object $self = { JOB => $job, INITIAL => $job } ; bless $self , $class ; $self->_set_defaults ; } $self->is_job($job); # Get ourself from tied hash if I was updated in tied hash $self = $self->compared_with_one_from_tiehash ; # Keep references in cache when not cached $self->cached unless ($self->is_cached); # Check SID: Set to 0 by default unless in a2p-status $self->set_sid_index(&_found_sid) unless (defined($self->get_sid_index)); # Delete initial tag for newly created object used as ident when destroyed if (exists($self->{INITIAL})) { #&StatusDebug("new Status object v$VERSION created for $job"); delete $self->{INITIAL} ; } &TIMESTAT('STATUS-GET-NEW'); return $self ; } sub _set_defaults { # Be careful, some defaults must only be set the first time my $self = shift ; $self->{JOB} = "noname" unless (defined($self->{JOB})); $self->{AFP} = $self->is_job unless (defined($self->{AFP})); $self->{JOBID} = "" unless (defined($self->{JOBID})); $self->{STEP} = 2 ; # We are still at step when jobstatus is created $self->{STATE} = [ INTIAL_STATUS ] ; $self->{LOCKID} = $LOCKID ; $self->{DAY} = 19700101 ; $self->{BORN} = 0 unless (exists($self->{BORN})); $self->{REV} = 0 unless (defined($self->{REV})); $self->{TIMER} = 0 unless (defined($self->{TIMER})); map { $self->unset_dirty_bit($_) } 1..16 ; # Reset dirty bits } sub set_sid_index { # SID must set as a number my $self = shift ; my $sid = shift ; if (defined($sid) and $sid =~ /^\d+$/) { $self->{SID} = $sid ; # Update our predictive internal SID index counter &_found_sid( $sid ); } elsif (exists($self->{SID})) { delete $self->{SID} ; } } sub get_sid_index { # Must return a number as SID return defined($_[0]->{SID}) ? $_[0]->{SID} : 0 ; } sub get_sid_key { # Return an unique SID key for the tied hash my $self = shift ; return ( $_[0] =~ /^\d+$/ ) ? _get_sid_tied_key($_[0]) : $_[0] if (@_ and $_[0]); # self->AFP is the default SID key return $self->get_sid_index ? _get_sid_tied_key($self->get_sid_index) : ( shift || $self->{AFP} ) ; } sub get_sid_version_key { return (defined($_[1]) ? ( $_[1] =~ /^\d+$/ ? $_[0]->get_sid_key($_[1]) : $_[1] ) : $_[0]->get_sid_key ) . _rev_ext ; } sub _is_revision_key { return $_[0] =~ _rev_ext_re ; } sub _found_sid { # API only useful in a2p-status return ( $_[0] > $SID_INDEX ? $SID_INDEX = shift : shift ) if (@_ and $_[0] =~ /^\d+$/); return 0 unless ($SID_INDEX); $SID_INDEX ++ ; return (defined(SID_CACHE->{$SID_INDEX}) or defined($TIEHASH->{_get_sid_tied_key($SID_INDEX)})) ? &_found_sid : $SID_INDEX ; } sub current_in_tie_sid_key { my $self = shift ; my $sid = $self->is_job ; my $key = 0 ; my %REFS = () ; # Reference counters to avoid infinite loops # Retrieve the tiehash SID as a number (it is 0 if not a SID number) while ($sid and $sid !~ /^\d+$/ and (!defined($REFS{$sid}) or $REFS{$sid} < 2)) { if ($sid =~ $MATCH_SID_TIED_KEY) { # Extract SID from key $sid = $1 ; } else { $REFS{$sid} ++ ; # Update reference counter to avoid infinite loops # SID is a reference to another key or SID is invalid if (defined($TIEHASH->{$sid})) { if (defined($TIEHASH->{$self->get_sid_version_key($sid)})) { $key = $sid ; $sid = 0 ; last ; } else { $sid = $TIEHASH->{$sid} ; } } else { $sid = 0 ; } } } $key = $self->get_sid_key($sid) unless ($key); # If not a numbered SID, we will retrieve from jobname #&StatusDebug("Status SID for ".$self->is_job." has changed". # ($self->get_sid_index?" from ".$self->get_sid_index:"")." to $sid") # if ($sid and $self->get_sid_index != $sid); return $key ; } sub compared_with_one_from_tiehash { my $self = shift ; my $job = $self->is_job ; # Retrieve the key of the object in the tied hash, needed to check version my $sidkey = $self->current_in_tie_sid_key ; if (defined($TIEHASH->{$sidkey})) { #&StatusDebug("Checking $sidkey tied version"); # 1. The revision from tied hash can be a newer version &TIMESTAT('STATUS-GET-COMPARED'); my $older = $self->is_older_than_tied($sidkey); &TIMESTAT('STATUS-GET-COMPARED'); if ( $older ) { &TIMESTAT('STATUS-GET-TIED'); &UPSTAT('HIT-TIED-STATUS'); # is_tied API must return 0 or a A2P::Status object my $newself = $self->is_tied($sidkey); if (defined($newself) and $newself) { if ($newself != $self) { #&StatusDebug("Replacing $job by tied $sidkey (" . # $newself->is_job . ") at rev. " . # $newself->get_revision ); # Don't forget to update it with current job name $newself->is_job($job); $self = $newself ; } } else { &StatusDebug("Can't get $job Status from local file"); } &TIMESTAT('STATUS-GET-TIED'); # 2. Our cached version is newer revision, we still got it } else { &UPSTAT('HIT-CACHED-STATUS'); #&StatusDebug("Keep cached '$job' status from memory at rev. " # . $self->get_revision); } } return $self ; } sub is_done_and_timer_aged { my $self = shift ; # Only set the status is aged when parameter is set and status is DONE and # the dirty bit 3 is not set (not synched in DB by a2p-status) return 0 unless ( $STATUS_MAXAGE and $self->is_done and ! $self->dirty_bit(3)); # Get the timer based age my $age = $self->is_tied_timer ? &ms() - $self->is_tied_timer : 0 ; &MAXSTAT("STATUS-TIMER-AGE",$age); return $age > $STATUS_MAXAGE ? 1 : 0 ; } sub _born { my $self = shift ; # Set day status my @time = localtime(time); $self->{DAY} = ($time[5]%100 + 2000)*10000 + ($time[4]+1) *100 + $time[3] ; # Set born time to the current millisecond from the epoch $self->{BORN} = &ms() unless (defined($self->{BORN}) and $self->{BORN}); } sub cached { &TIMESTAT('STATUS-OBJECT-CACHED'); my $self = shift ; # Update every important key in cache map { $CACHE->{$self->{$_}} = $self } grep { defined($self->{$_}) and $self->{$_} } qw{ JOB JOBID AFP }; # Check jobs reference in cache my @jobs = defined($self->{JOBS}) ? @{$self->{JOBS}} : (); map { $CACHE->{$_} = $self } grep { defined } @jobs ; # Check SID usable in CACHE SID_CACHE->{$self->get_sid_index} = $self if $self->get_sid_index ; # Set status is cached $self->cache_timer ; $self->is_cached(1); &TIMESTAT('STATUS-OBJECT-CACHED'); 1 ; } sub cache_timer { # Update the cache timer $_[0]->{TIMER} = &ms() ; } sub timer { return $_[0]->{TIMER} || 0 ; } sub is_cached { my $self = shift ; return @_ ? ((defined($_[0]) and $_[0]) ? $self->set_dirty_bit(10) : $self->unset_dirty_bit(10)) : $self->dirty_bit(10) ; } ############## Dirty bits ############################ Dirty bits ############## # 1 -> Dirty cache: Cached status must be written to tied status hash # 2 -> Cache synched with DB, need to written new version in tied status hash # 3 -> Status is new or is not done in DB -> can't be removed from tied hash # 4 -> Cache and hash are synched: versioning can be updated in DB ############## Dirty bits ############################ Dirty bits ############## sub dirty_bit { my $self = shift ; my $set = shift || 1 ; &StatusDebug("SET not defined") unless (defined($set)); if (@_) { $self->{DIRTY} = 0 unless (exists($self->{DIRTY})); vec($self->{DIRTY},$set,1) = (defined($_[0]) and $_[0])? 1 : 0 ; } return exists($self->{DIRTY}) ? vec($self->{DIRTY},$set,1) : 0 ; } sub set_dirty_bit { # Set a bit with birty bit API my $self = shift ; my $bit = shift || 1 ; return $self->dirty_bit( $bit, 1 ); } sub unset_dirty_bit { # Unset a bit with birty bit API my $self = shift ; my $bit = shift || 1 ; return $self->dirty_bit( $bit, 0 ); } sub get_cached_age { return $_[0]->timer ? &ms() - $_[0]->timer : 0 } sub check_cached_aged { my $self = shift ; # Check if our cache is too aged if ($STATUS_CACHE_MAXAGE and defined($self)) { my $age = $self->get_cached_age ; if ( $age > $STATUS_CACHE_MAXAGE ) { &MAXSTAT('STATUS-OBJECT-CACHE-AGE',$age); &UPSTAT('CACHED-STATUS-OBJECT-CLEANED'); $self->remove_from_cache ; $self = undef ; } else { # Set status is cached $self->is_cached(1); } } return $self ; } sub real_age { return defined($_[0]->{BORN}) ? &ms() - $_[0]->{BORN} : 0 ; } sub StatusDebug { return 1 unless $ADVANCED_DEBUGGING ; my $add = "" ; if ($0 !~ /^TEST/) { if (defined($MESG{$_[0]}) and my ( $repeat, $timeout ) = @{$MESG{$_[0]}}) { $MESG{$_[0]}->[0] ++ ; return 1 unless ( &ms() >= $timeout ); $add = " ($repeat times)" unless ($repeat==1); } else { $MESG{$_[0]} = [ 1 ] ; } } $MESG{$_[0]}->[1] = &ms(60) ; # Keep time out in one minute my ($package, $filename, $line) = caller ; &Debug( map { "at L.$line: " .$_ .$add } @_ ); } sub get_lockid { return defined($_[0]->{LOCKID}) ? $_[0]->{LOCKID} : $LOCKID ; } sub _format_info_to_keep { my $this = shift ; my $key = shift ; my $value = shift ; # Only handle the special case 'ABTERM' return $value unless ($key =~ /^ABTERM|ERRORS$/); # Manage ABTERM case as growing array stripping ABTERM string my $ref = ref($this->{$key}) =~ /^ARRAY/i ? $this->{$key} : [] ; $value =~ s/^ABTERM[:]*\s*// ; push @{$ref} , $value ; return $ref ; } sub infos { my $self = shift ; my $Infos = shift || {} ; # Check $Infos is hash ref return &Debug("Can't handle such information format ($Infos)") && 0 unless ( ref($Infos) =~ /^HASH/i ); my $tmpref = $self ; # When called with a JID, we will have specific informations if (defined($Infos->{JID})) { # Update what we are for next informations $self->{$Infos->{JID}} = {} unless (defined($self->{$Infos->{JID}})); $tmpref = $self->{$Infos->{JID}} ; $self->is_job($Infos->{JID}); delete $Infos->{JID} ; } while ( my ( $key, $info ) = each(%{$Infos}) ) { $info = "" unless (defined($info)); # Strip not cesure chars and strip strings $info =~ s|[^ .0-9A-Za-z%_/-]||g ; $info =~ s/^\s+// ; $info =~ s/\s+$// ; $tmpref->{$key} = _format_info_to_keep($tmpref, $key, $info) ; } # Also auto update $self->_auto_update ; 1 ; } sub _auto_update { my $self = shift ; # Check to update STATUS to ABTERM if ABTERM defined and STATUS is empty $self->{STATUS} = 'ABTERM' if (defined($self->{ABTERM}) and $self->{ABTERM} # Control we are not trying to validate the job and ( ref($self->{ABTERM}) !~ /^ARRAY/i or $self->{ABTERM}->[$#{$self->{ABTERM}}] !~ /^validated/i ) and ( ! defined($self->{STATUS}) or ! $self->is_abterm )); # Check job count if (my ( $number ) = $self->{JOB} =~ /-(\d+)$/) { # Update jobs index list with jid hash ref $self->{JOBS}->[$number] = $self->is_job ; $self->{NBJOBS} = $number if (!defined($self->{NBJOBS}) or $self->{NBJOBS} < $number ); } } my $ko_status = qr/^ko|abterm$/i ; sub is_abterm { return $_[0]->{STATUS} =~ $ko_status ; } my $done_status = qr/^DONE$/i ; sub is_done { return $_[0]->{STATUS} =~ $done_status ; } sub is_filtered { # Need to return 1 when not matching filters, filters are references my $self = shift ; my $job_filter = shift ; my $day_filter = shift ; my $status_filter = shift ; return &Error("Unsupported filtering mode") unless ( grep { ref($_) =~ /^SCALAR/ } $day_filter, $job_filter, $status_filter == 3 ) ; # Filter on the day is done only return 1 if ($$day_filter and ! defined($self->{'ABTERM'}) and $self->is_done and $self->{'DAY'} !~ /^$$day_filter$/); # Filter on status if ( $$status_filter ) { my ( $not , $filter ) = $$status_filter =~ /^(!?)(.*)$/ ; if ($filter = qr/^$filter$/i) { return 1 if (($not and $self->{'STATUS'} =~ $filter) or (! $not and $self->{'STATUS'} !~ $filter)); } } return 0 unless ($$job_filter); # We are filtering on a regex my $filter = ref($$job_filter) =~ /^regex/i ? $$job_filter : qr/$$job_filter/i ; return ( $self->{'AFP'} !~ $filter and $self->{'JOBID'} !~ $filter ); } sub is_finished { return ( $_[0]->{STEP} == 12 and $_[0]->step_status(12) eq 'o' ) ? 1 : 0 ; } sub is_job { my $self = shift ; $self->{JOB} = shift if (@_ and defined($_[0])); return $self->{JOB} ; } sub is_tied_timer { my $self = shift ; $self->{TIED_TIMER} = shift if @_ ; return $self->{TIED_TIMER} || 0 ; } sub is_tied { # Have to update us against tied version my $self = shift ; my $sidkey = $_[0] || $self->get_sid_key(@_) ; return 0 unless (exists($TIEHASH->{$sidkey})); #&StatusDebug("Controling $sidkey from tied hash"); while ($TIEHASH->{$sidkey} =~ /^\d+$/) { &Error("$sidkey sidkey is not status but reference to " . $TIEHASH->{$sidkey}); my $rev_key = $self->get_sid_version_key($sidkey) ; if (exists($TIEHASH->{$rev_key})) { &Info("$rev_key revision key exists for $sidkey, removing it..."); delete $TIEHASH->{$rev_key} ; } my $newkey = $self->get_sid_key($TIEHASH->{$sidkey}) ; unless (exists($TIEHASH->{$newkey})) { delete $TIEHASH->{$sidkey} ; return &Error("Bad reference to $sidkey"); } $sidkey = $newkey ; } &TIMESTAT('IS-TIED-API'); # Work-around to a possible concurrencing case when key is updated in # another process after a split job event, fix a a2p-status service crash # TODO reproduce the case to found a better resolution my $newself = $TIEHASH->{$sidkey} ; { my $tries = 10 ; while ( $newself !~ /^\x04/ and $tries -- ) { usleep $USLEEP ; $newself = $TIEHASH->{$sidkey} ; } $SIG{'__DIE__'} = sub { undef $newself ; } ; $newself = &thaw($newself) if ( $newself =~ /^\x04/ ); } if (ref($newself) =~ /^A2P::Status/) { #&StatusDebug("Retrieved $sidkey from tied hash"); # Control known SID $self->set_sid_index( $self->db_sid ) unless ( $self->get_sid_index eq $self->db_sid ); } else { &StatusDebug("Can't retrieved $sidkey as A2P::Status from tied hash"); $newself = 0 ; } &TIMESTAT('IS-TIED-API'); return $newself ; } sub remove_from_tied_hash { # Still return if we don't have write access to tied file return unless (_is_tie_writer); my $self = shift ; my $sid_key = $self->get_sid_key ; # Remove from tied hash if really in tied hash return unless (exists($TIEHASH->{$sid_key})); &TIMESTAT('REMOVE-FROM-TIED'); &StatusDebug("Removing $sid_key from tied hash"); delete $TIEHASH->{$sid_key} ; delete $TIEHASH->{$self->get_sid_version_key($sid_key)} ; # Remove DB hash entry $self->_DB_remove ; # Remove any other reference to us, maybe chained references my %remove_it = ( $sid_key => 1 ); my %not_ref_to = () ; while ( my ( $key, $value ) = each(%{$TIEHASH}) ) { # Skip well known not reference keys next if ( $key =~ /_rev$/ ); my $next = $self->get_sid_key($value) ; if ( $key =~ /^_SID_/ or $value =~ /\x04/ or exists($not_ref_to{$next}) or exists($not_ref_to{$key}) ) { $not_ref_to{$key} = 1 ; next ; } # Remove this key if it points to a well known reference to sid_key if ( exists($remove_it{$next}) ) { $remove_it{$key} = 1 ; } else { # Check chained reference my %list = ( $key => 1 ) ; while ( $value ) { $value = $TIEHASH->{$next} || '' ; if ( exists($remove_it{$next}) ) { map { $remove_it{$_} = 1 } keys(%list) ; last ; } elsif ( $next =~ /^_SID_/ or $value =~ /\x04/ or exists($not_ref_to{$value}) ) { map { $not_ref_to{$_} = 1 } keys(%list) ; last ; } # Avoid looping infinitely on cross reference last if (exists($list{$next})); $list{$next} = 1 ; $next = $self->get_sid_key($value) ; } } } # Now delete the found references, removing referenced key delete $remove_it{$sid_key} ; map { delete $TIEHASH->{$_} } keys(%remove_it) ; &TIMESTAT('REMOVE-FROM-TIED'); } sub remove_from_db { my $self = shift ; # Delete the row and eventually and eventually optimize the table &job_status_delete($self->get_sid_index) if ($self->get_sid_index); } sub remove_from_cache { my $self = shift ; # Delete any used key pointing to ourself map { delete $CACHE->{$_} } grep { $CACHE->{$_} == $self } keys(%{$CACHE}) ; # Check any SID pointing to ourself map { delete SID_CACHE->{$_} } grep { SID_CACHE->{$_} == $self } keys(%{&SID_CACHE}) ; # Here cache should not have any reference to us, will die here $self->is_cached(0); } sub clone { return &thaw(&freeze($_[0])) ; } sub checked_in_db_timer { my $self = shift ; my @v = @_ ? ( $_[0] ? &ms() : 0 ) : () ; return @v ? $self->_DB_this( 'CHK', 0, @v ) : $self->_DB_this( 'CHK', 0 ); } sub check_update_in_db { my $self = shift ; my $sid = $self->get_sid_index || $self->db_sid ; unless ($sid) { # Don't check if no SID available &UPSTAT('STATUS-BAD-CHECK-UPDATE'); return 0 ; } # Don't recheck in DB too quickly, one time by minute is sufficient return 0 if ($self->checked_in_db_timer > &ms(-60)); # Update checked in db timer to current time $self->checked_in_db_timer(1); my $updated = 0 ; my @row = &is_job_status_updated($self->get_sid_index,$self->get_revision); if (@row) { # Update ourself my @keys =qw( LOCKID JOBID REV TIMER STEP STATE BORN DAY AFP STATUS INFOS ); foreach my $key (@keys) { my $value = shift @row ; unless (defined($value)) { &StatusDebug("Bad update on $key value"); last ; } if ($key eq 'STATE') { $self->{$key} = [ split(//,$value) ] ; } else { $self->{$key} = $value ; } $updated ++ ; } &StatusDebug("Updated $updated values from DB for job ".$self->{JOBID}); $self->set_dirty_bit ; } return $updated ; } sub _DB_name { return $_[0]->{JOBID} ; } sub _DB_this { my $self = shift ; my $key = shift or return 0 ; my $def = shift || 0 ; my $job = $self->_DB_name ; $DB->{$job} = {} unless (exists($DB->{$job})); $DB->{$job}->{$key} = shift if ( @_ and defined($_[0]) ); return exists($DB->{$job}->{$key}) ? $DB->{$job}->{$key} : $def ; } sub _DB_remove { # Free memory to be called when no more needed. Should be called when the # comparaison object is no more available in tied hash my $self = shift ; &UPSTAT('STATUS-DB-REMOVE'); &UPSTAT('STATUS-DB-REMOVE-AGE',$self->real_age); delete $DB->{$self->_DB_name} if (exists($DB->{$self->_DB_name})); } sub db_revision { my $self = shift ; return @_ ? $self->_DB_this( 'REV', 0, @_ ) : $self->_DB_this( 'REV', 0 ); } sub db_sid { my $self = shift ; return @_ ? $self->_DB_this( 'SID', 0, @_ ) : $self->_DB_this( 'SID', 0 ); } sub db_checked { my $self = shift ; $self->_DB_this( 'SYNC_TIMER', 0, &ms() ); return $self->_DB_this( 'CHECKED', 0, $self->_DB_this( 'CHECKED', 0 )+1 ); } sub checking_too_quickly { return ( &ms() - $_[0]->_DB_this( 'SYNC_TIMER', 0 ) > 500 ) ? 0 : 1 ; } sub is_newer_than_db { my $self = shift ; return ( $self->db_revision < $self->get_revision ) ? 1 : 0 ; } sub sync_with_db { my $self = shift ; my $job = $self->is_job ; $! = 0 ; # Get the current SID for that status my $sid = $self->get_sid_index || $self->db_sid ; # We update when we know DB rev is lower than current from cache if ( $self->is_newer_than_db ) { # Prepare row for update my @row = ( $sid ) ; push @row, map { defined($self->{$_}) ? $self->{$_} : "NULL" } qw{ LOCKID JOBID REV TIMER STEP STATE BORN DAY AFP STATUS NBJOBS }; # Add DestId list my %destids = () ; my @jobs = defined($self->{JOBS}) ? grep { defined($_) } @{$self->{JOBS}} : () ; map { $destids{uc($self->{$_}->{DESTID})} = 1 } grep { defined($self->{$_}->{DESTID}) } @jobs ; if (exists($destids{NULL})) { # Protect NULL DestID if defined delete $destids{NULL} ; $destids{'(NULL)'} = 1 ; } push @row, join(" ",keys(%destids)) || "none" ; push @row, $self->{INFOS} || "" ; # Check if update is done and we can continue the process $sid = &job_status_update(@row) ; unless ( $sid and ($sid == $self->db_sid or $self->db_sid($sid))) { &Debug("Can't sync $job with @row to db row sid ".$sid); return 0 ; } # Update revision $self->db_revision($self->get_revision); } elsif ( ! $self->dirty_bit(3) ) { # Check if ABTERM status was updated by another service, this API do it # at last one time by minute. We don't need to check GOOD status as they # can't be updated to better status &UPSTAT('STATUS-UPDATED-FROM-DB') if ( $self->is_abterm and $self->check_update_in_db ); } # Update our cache if gotten SID is different, check it as # string as SIDs from standard thread are string by default (afp jobname) if ( $self->get_sid_index ne $self->db_sid ) { $self->set_sid_index( $self->db_sid ); } # Keep statistics on control &MAXSTAT('STATUS-RESYNCH-CHECKED',$self->db_checked); # TODO Find why some time db_revision is greater than get_revision, maybe # because a status update is done by a process on older status in its cache unless ($self->db_revision and $self->db_revision >= $self->get_revision) { &Debug("$job DB revision is " . $self->db_revision . " but ours is " . $self->get_revision); return 0 ; } return 1 ; } sub get_next_from_db { my $self = shift ; # Get the next row as a hash with column names as keys my $row = &get_next_jobstatus( $self->get_sid_index, @_ ) ; return 0 unless (ref($row) =~ /^HASH/); # Invalidate ourself with empty JobID $self->{JOBID} = '' ; $self->is_job('INVALID JOB ROW'); # Update ourself to job in DB values foreach my $key (keys(%{$row})) { if ($key eq 'STATE') { $self->{STATE} = [ split(//,$row->{STATE}) ] ; } else { $self->{$key} = $row->{$key} ; } } # Keep who we are now as valid job $self->is_job($self->{JOBID}) if ($self->{JOBID}); 1 ; } sub set_revision { return ++ $_[0]->{REV} ; } sub get_revision { return $_[0]->{REV} ; } sub save_tied_versioning { # We assume we can write to tied hash my $self = shift ; my $version = [ $self->get_revision, $self->get_lockid, $self->timer ] ; #&StatusDebug("Saving ".$self->is_job." rev. ".$self->get_revision. # " versioning at key ".$self->get_sid_version_key(@_)); return $TIEHASH->{$self->get_sid_version_key(@_)} = &freeze($version); } sub is_tied_versioning { my $self = shift ; my $key = $self->get_sid_version_key(@_) ; unless (defined($TIEHASH->{$key})) { &StatusDebug("Versioning key $key is lost"); return ( 0, $LOCKID, 0 ); } my @version = @{&thaw($TIEHASH->{$key})}; return @version ; } sub is_older_than_tied { # When comparing with tied version, we should check first the timer when # checking on the same LOCKID assuming this is the same service in the # same place my $self = shift ; my ( $rev, $lockid, $timer ) = $self->is_tied_versioning(@_) ; # Set ourself as dirty cached status if we are newer than tied version $self->set_dirty_bit if ($self->get_revision > $rev); #&StatusDebug("Comparing to @_ tied rev $rev, lockid $lockid, timer $timer"); return (( $LOCKID =~ /^$lockid$/ and $self->timer < $timer ) or $self->get_revision < $rev ) ? $rev : 0 ; } sub status { my $self = shift ; return $self->{STATUS} = defined($self->{STATUS}) ? uc($self->{STATUS}) : "" ; } sub step_status { my $self = shift ; # Remove any undefined value, arise when starting a job while (@_ and !defined($_[0])) { shift } my $Step = shift || 0 ; # Check step value to avoid updating job status on unused step return 0 if ($Step < 0 or $Step > 12 ); if (@_) { # Set step and later keep us in cache my $State = shift || '.' ; # Check we can update status if ( $Step < 3 and $State =~ /^[._]$/ ) { my $prev = $self->{STATE}->[$Step-1] || '0' ; # Step starting, previous step must be 'o', '0', '_', '-' or 'A' return 0 unless ( $prev =~ /^[0Ao_-]$/ ); } elsif ( $Step == 1 and $State eq 'o' ) { # First call initializing a basic job status entry. This is # done the first time in SpoolManager when processing AFP file my $last = $self->{STATE}->[1] ; # Check job has still been started my $restarted = (defined($last) and $last ne '_') ? (exists($self->{FILE}) ? $self->{FILE} : $self->{AFP}) : '' ; &Debug("Starting job " . $self->is_job . ($restarted?"... ($restarted)":"")); # Initializes our birth and keep us in cache $self->_set_defaults ; $self->_born ; # Check to remove old job status from previous start if ($restarted) { $self->{RESTARTED} = time ; $self->{RESTART} = exists($self->{RESTART}) ? ++ $self->{RESTART} : 1 ; # Set first step to number indicates a restart (with 0 -> >10) $self->{STATE}->[0] = $self->{RESTART} > 9 ? 0 : $self->{RESTART} ; &Info("Seems to restart $restarted job".($self->{RESTART}>1? " (restart #".$self->{RESTART}.")":"")); } # Set this status is new to being handled quickly $self->set_is_new ; } elsif ( $Step == 1 and $State eq 'V' ) { # Specific to validation: force as new $self->set_is_new ; $State = 'o' ; } # Don't update STEP if still at higher step (protection) $self->{STEP} = $Step unless ( $self->{STEP} > $Step ); # Update STATE checking if we can really update it $self->{STATE}->[$Step] = $State unless ( $self->{STATE}->[$Step] eq 'o' or ( $self->{STATE}->[$Step] eq 'A' and $State ne 'o' ) ); # Now set cache is dirty $self->set_dirty_bit ; } # Still return the current step return $self->{STATE}->[$Step] eq '0' ? '_' : $self->{STATE}->[$Step] ; } sub set_is_new { my $self = shift ; # 1. Set the bit 3 to guaranty the status is not removed from tied hash # Before it is in DB $self->set_dirty_bit(3); # 2. Use shared list of new status to inform a2p-status to handle this one # This is done in SpoolManager thread my $shared_list = &SharedList( $self->get_lockid ); push @{$shared_list}, $self->get_sid_key ; &FreeSharedList( $self->get_lockid ); # 3. Remove any stored Error or ABTERM delete $self->{ABTERM} ; delete $self->{ERRORS} ; # 4. Remove any child error if (exists($self->{JOBS}) and ref($self->{JOBS}) =~ /^ARRAY/i) { map { delete $self->{$_} ; } grep { defined($_) and $_ and exists($self->{$_}) } @{$self->{JOBS}} ; delete $self->{JOBS} ; } } sub save_tied { my $self = shift ; # Update our revision before trying to save it in tied hash # But disable setting revision if requested for the case of just renaming # status in tied hash $self->set_revision() unless ( @_ and $_[0] =~ /^not a revision$/ ); # Check we are really tied return 0 unless ( _is_tie_writer ); &TIMESTAT('SAVE-TIED'); # Reference is a number when we are knowing the SID index in DB # otherwise it's the first inserted key in tied hash given by get_sid_key my $sid_ref = $self->get_sid_index || $self->get_sid_key ; # Update any important reference my $sidkey_re = qr/^$sid_ref$/ ; foreach my $ref (qw{ JOB AFP JOBID LINKEDAFP }) { # Skip undefined and not set next unless (defined($self->{$ref}) and $self->{$ref}); # Skip the SID key itself next if ( $self->{$ref} =~ $sidkey_re ); # Skip if ref is still set next if ( defined($TIEHASH->{$self->{$ref}}) and $TIEHASH->{$self->{$ref}} =~ $sidkey_re ); # Then set reference $TIEHASH->{$self->{$ref}} = $sid_ref ; #&StatusDebug($self->{$ref}." replaced"); # Delete old versioning if (exists($TIEHASH->{$self->{$ref}._rev_ext})) { delete $TIEHASH->{$self->{$ref}._rev_ext}; #&StatusDebug($self->{$ref}._rev_ext. " deleted"); } } # Reset cached status my $cached = $self->is_cached ; $self->is_cached(0); $self->unset_dirty_bit ; # Update tied timer before freeze $self->is_tied_timer(&ms()); # Prepare tied value as Storable freeze with DB information my $freeze = &freeze($self) ; # Keep statistics on stored length in tied hash &MAXSTAT('MAX-TIED-STATUS-LENGTH',length($freeze)); &UPSTAT('SAVE-TIED'); # Save tied value as Storable freeze my $sidkey = $self->get_sid_key ; $TIEHASH->{$sidkey} = $freeze ; $self->save_tied_versioning($sidkey) ; &Warn("$sidkey SID versioning still lost") unless (defined($TIEHASH->{$sidkey._rev_ext})); #&StatusDebug($self->is_job." saved in tied for key $sidkey rev. " . # $self->get_revision); # Reset cached status and others $self->is_cached($cached); &TIMESTAT('SAVE-TIED'); return $self ; } sub DESTROY { # Remove debug message for minimal object $MUSTQUIT and &StatusDebug("Freeing " . __PACKAGE__ . " object memory " . $_[0]->get_sid_index . ":" . $_[0]->is_job . ( defined($_[0]->{INITIAL}) ? " (" . $_[0]->{INITIAL} . ")" : "" )); } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;