# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: JobStatus.pm 3 2007-10-18 16:20:19Z guillaume $ # # Some job status subfunctions callable from a2p, a2p-status and webmin modules # package A2P::JobStatus ; use strict; use Fcntl; use Time::HiRes qw( usleep gettimeofday tv_interval) ; use Fcntl qw(:flock F_SETLK F_WRLCK SEEK_SET); use File::stat; use IO::Socket; use GDBM_File; use A2P::Globals ; use A2P::Syslog ; use A2P::Status ; use A2P::Tools qw( SharedList FreeSharedList ms ); BEGIN { use Exporter (); our ( $VERSION , @ISA , @EXPORT_OK ); $VERSION = sprintf "%s", q$Rev: 986 $ =~ /(\d[0-9.]+)\s+/ ; @ISA = qw(Exporter); @EXPORT_OK = qw( &sync_dbm_to_db &a2pjobstate &otherjobstate &getstatus_from_db &get_status ); } our $VERSION ; my %TIED_STATUS = () ; # Tied hash my %STATUS = () ; # Status object cache my $LCK ; my $CURRENTMODE = 0 ; # Access mode to tie: 0 = RO, 1 = R/W sub _freezed_status_re { qr/^\x04.*A2P::Status/ } sub _get_tied_ref { return \%TIED_STATUS ; } sub _not_tied { return tied(%TIED_STATUS) ? 0 : 1 ; } sub _get_cache_ref { return \%STATUS ; } sub cansavetied { $CURRENTMODE = 0 if _not_tied ; return $CURRENTMODE ; } sub a2pjobstate { return 1 unless ( $SCAN_SPOOL and $KEEP_JOBSTATUS ); &TIMESTAT('UPDATE-JOBSTATE'); my $ret = &tied_update_status(@_); # 9. Call the cache cleaner after the status is updated &cache_cleaner() ; &TIMESTAT('UPDATE-JOBSTATE'); return $ret ; } sub otherjobstate { # Check gdbm file exists before update specially if we are root !!! to # avoid create an unreadable gdbm file for services return 0 unless -e &dbm_base . $LOCKID ; return &tied_update_status(@_); } sub newstatus_dbm_base { return $SHMDIR . '/.new-status-' ; } sub dbm_base { return $SHMDIR . '/.jobstatus-' ; } sub dbm_lock { my $file = shift ; my $lockid = shift || $LOCKID ; return &Error("Can't open lock file to update $file job status: $!") unless (open($LCK, '>', $file.'.LCK')); my $tries = 0 ; while ( $tries < 5 ) { # Lock status update as only one thread should update status at a time if (flock($LCK, LOCK_EX | LOCK_NB)) { &MAXSTAT('STATUS-LOCK-RETRY',$tries); return 1 ; } usleep $USLEEP >> 2 ; $tries ++ ; } close($LCK); undef $LCK ; &MAXSTAT('STATUS-LOCK-RETRY',$tries); return 0 ; } sub dbm_unlock { my $lockid = shift || $LOCKID ; return 1 unless (defined($LCK)); flock($LCK, LOCK_UN); $CURRENTMODE = 0 ; close($LCK); undef $LCK ; # Key used by A2P::Status API to know if it can write to tiehash delete $STATUS{__GDBM_WRITER__}; 1 ; } my $search_status = '' ; sub get_status { my $name = shift ; my ( $key, $status, $found ) = ( '', 0, '' ); # Parse each status in cache foreach (( $key, $status ) = each(%STATUS)) { next unless (&is_status($status)); if ($status->is_job eq $name or $status->{JOBID} eq $name or $status->{AFP} eq $name) { $found = $key ; last ; } } # Set we are searching for this job for next list scan if not found if ($found) { $search_status = '' unless ( $search_status ne $name ); } else { &StatusDebug("Searching for status object of $name"); $search_status = $name ; $status = 0 ; } return $status ; } my @sync_list = ( {}, {}, {}, {} ) ; # Array of 4 hash refs # 0 -> Lists to sync cache by lockid (keys like __.*__ are based on cache keys) # 1 -> Lists of expected keys to be synchronised # 2 -> Lists of timer for resynchronization # 3 -> Lists of keys to recheck as writer sub sync_list { # Return a given number of element to check in sync_cache API my $lockid = shift || $LOCKID ; my ( $lists, $shorts, $timings ) = @sync_list ; $lists->{$lockid} = [] unless (exists($lists->{$lockid})); my $list = $lists->{$lockid} ; $shorts->{$lockid} = {} unless (exists($shorts->{$lockid})); my $shortlist = $shorts->{$lockid} ; # Check if we need to repopulate the list unless ( @{$list} ) { # Get timing $timings->{$lockid} = [ 0, 0 ] unless (exists($timings->{$lockid})); my $timing = $timings->{$lockid} ; if ($timing->[0]) { # Check how many time with passed to come back here: stop chrono &TIMESTAT('SYNC-LIST-PROCESSED-'.$lockid); # Update some statistics my $delay = time - $timing->[0] ; &MAXSTAT('SYNC-LIST-RATE-'.$lockid, sprintf('%.2f',$timing->[1]/$delay)) if $delay; $timing->[0] += $delay ; # Update to current time $timing->[1] = 0 ; # Init list count } else { # Initialize processing timer $timing->[0] = time ; # Set to current timer $timing->[1] = 0 ; # Init list count } &TIMESTAT('SYNC-LIST-REPOPULATE-'.$lockid); &UPSTAT('SYNC-LIST-REPOPULATE-'.$lockid); # Repopulate list push @{$list}, grep { &is_tied_status($_) } keys(%TIED_STATUS) ; $timing->[1] += @{$list} ; &MAXSTAT('SYNC-LIST-COUNT-'.$lockid,$timing->[1]); &MAXSTAT('SYNC-LIST-REPOPULATED-'.$lockid,scalar(@{$list})); &TIMESTAT('SYNC-LIST-REPOPULATE-'.$lockid); &TIMESTAT('SYNC-LIST-PROCESSED-'.$lockid); # Start the come back chono } # Get list count to return my $return_count = $MAX_CONV_RATE > 0 ? $MAX_CONV_RATE : 20 ; # Synchronize quickly the short/expected list my @list = () ; my @shortlist = keys(%{$shortlist}) ; my $sync_shortlist = @shortlist < $return_count ? scalar(@shortlist) : $return_count ; if ($sync_shortlist) { # Update @list with shortlist and remove shortlist entries map { push @list, $_ ; delete $shortlist->{$_} } splice @shortlist, 0, $sync_shortlist ; } &MAXSTAT('SYNC-LIST-SHORT-'.$lockid, $sync_shortlist); # Synchronize slowly the complete list my $sync_longlist = @list < $return_count ? $return_count - @list : 1 ; # Update list with slow list elements push @list, splice @{$list}, 0, $sync_longlist > @{$list} ? scalar(@{$list}) : $sync_longlist if @{$list} ; &MAXSTAT('SYNC-LIST-LONG-'.$lockid, $sync_longlist); &MAXSTAT('SYNC-LIST-'.$lockid,scalar(@list)); return @list ; } sub sync_list_inject_again { # Empty the current synching list by re-injecting them in expected keys list my $lockid = shift || $LOCKID ; my $array = shift || () ; return unless @{$array} ; &UPSTAT('SYNC-INJECT-AGAIN-'.$lockid); &MAXSTAT('SYNC-INJECTED-AGAIN-'.$lockid,scalar(@{$array})); map { &Debug("Inject $_ in $lockid shortlist") if ($ADVANCED_DEBUGGING); $sync_list[1]->{$lockid}->{$_} = 1 } splice @{$array} ; } sub in_expected_sync_list { # Populate expected keys to be synched my $lockid = shift ; my $key = shift ; my $this = exists($sync_list[1]->{$lockid}) ? $sync_list[1]->{$lockid} : $sync_list[1]->{$lockid} = {} ; $this->{$key} = 1 ; &Debug("Expecting synchronization on $key in $lockid shortlist") if ($ADVANCED_DEBUGGING); &UPSTAT('EXPECTED-SYNC-ADDED-'.$lockid); } sub sync_new_list { # Check new status to be synched my $lockid = shift || $LOCKID ; my @new_status = () ; my $shared_list = &SharedList( $lockid ); if (@{$shared_list}) { map { my $status = shift @{$shared_list} ; push @new_status, $status ; &Debug("Got '$status' from '$lockid' SharedList at #$_") if ($ADVANCED_DEBUGGING); } 0..$#{$shared_list} ; } &FreeSharedList( $lockid ); return () unless @new_status ; # Avoid statistics if nothing new found &UPSTAT('SYNC-NEW-LIST-'.$lockid); &MAXSTAT('SYNC-NEW-LIST-COUNT-'.$lockid,scalar(@new_status)); return @new_status ; } sub tie_status_hash { my $lockid = shift || $LOCKID ; $CURRENTMODE = shift ; $CURRENTMODE = 1 unless (defined($CURRENTMODE)); my $file = &dbm_base . $lockid ; # Synchronize mode with lock status, we can only write if we have an # exclusive lock on the file $CURRENTMODE = $CURRENTMODE ? &dbm_lock( $file, $lockid ) : 0 ; if (tied(%TIED_STATUS)) { &UPSTAT('WARNING-UNEXPECTED-'.$lockid.'-UNTIE-NEEDED'); &StatusDebug("Can't tie again hash, trying to untie first"); untie %TIED_STATUS ; } # Tie STATUS hash with read/write access my $ret = tie( %TIED_STATUS , 'GDBM_File' , $file , ($CURRENTMODE ? GDBM_WRCREAT|GDBM_NOLOCK : GDBM_READER) , oct(660) ); # Inform A2P::Status API it can write in tiehash if (defined($ret) and $ret == tied(%TIED_STATUS)) { $STATUS{__GDBM_WRITER__} = $CURRENTMODE ; } else { delete $STATUS{__GDBM_WRITER__} ; } return $ret ; } my $shrink_timer = {} ; # Timers to reorganize gdbms sub _shrink_gdbm_file { my $lockid = shift ; # Can only shrink GDBM files at least each minute return 0 if ( exists($shrink_timer->{$lockid}) and &tv_interval($shrink_timer->{$lockid}) < 60 ); # Get file statistics my $stat = stat( &dbm_base . $lockid ) or return 0 ; # Check if we can shrink GDBM file each minute if file is greater than 1Mb my $delay = $stat->size > 1024000 ? 60 : 600 ; # default is 10 minutes return if ( exists($shrink_timer->{$lockid}) and &tv_interval($shrink_timer->{$lockid}) < $delay ); my $stat_tag = 'REORGANIZE-GDBM-'.$lockid ; &UPSTAT($stat_tag.'-CHECK'); # Get file time update and apply factor relative to file size my $condition = ( time - $stat->mtime ) * int( $stat->size / 512000 ) ; # Condition is equivalent to a GDBM file not used since at least 40 seconds # if the file size is about 0,5-1 Mb. Lower size won't be shrinked. Greater # will be shrink in a less time interval. On a high load, the shrink will # be delayed until activity is reduced on a minimum basis: at least 1 second # of inactivity and file size is greater than 20 Mbytes if ( $condition > 40 ) { $! = 0 ; &UPSTAT($stat_tag.'-TRY'); &MAXSTAT($stat_tag.'-FILE-SIZE',$stat->size); my $shrinked = 0 ; my $gdbm = tied(%TIED_STATUS) ; if (ref($gdbm) =~ /^GDBM_File/i ) { &TIMESTAT($stat_tag); $shrinked = $gdbm->reorganize() ; &TIMESTAT($stat_tag); } &Info("Bad shrink on $lockid GDBM jobsstatus file: $!") unless (defined($shrinked) and $shrinked > -1 and ! $!); $shrink_timer->{$lockid} = [ &gettimeofday() ] ; } else { &UPSTAT($stat_tag.'-SKIPPED'); } } my %StatusDebug = ( 1 => "Cleaning jobstatus objects tied in hash", 2 => "Get status object", 3 => "Cleaning jobstatus objects cache", 4 => "Updating object status", 5 => "Updating object infos", 6 => "Saving objects status to tied hash", 7 => "Status update done" ); my %MESG = () ; sub StatusDebug { # Specific debugging handling to keep good performance when not activated return unless ($ADVANCED_DEBUGGING); my $add = "" ; if (defined($MESG{$_[0]}) and my ( $repeat, $timer ) = @{$MESG{$_[0]}}) { $MESG{$_[0]}->[0] ++ ; return unless (&tv_interval($timer)>=60); $add = " ($repeat times)" unless ($repeat==1); } else { $MESG{$_[0]} = [ 1 ] ; } $MESG{$_[0]}->[1] = [ &gettimeofday() ] ; &Debug($_[0] =~ /^\d+$/ ? $StatusDebug{$_[0]}.$add : "@_$add"); } my $RESYNCH_TIMEOUT = 10 ; # 10 seconds before synchronizing in tied hash sub sync_lockid { # Sync the object for a lockid from tied to DB # Return true if we need to recheck as writer my $lockid = shift ; my $mode = shift ; # Check initialization $sync_list[3]->{$lockid} = {} unless (exists($sync_list[3]->{$lockid})); my $resynch_list = $sync_list[3]->{$lockid} ; my $status_count = 0 ; my $resynch_needed = 0 ; my $timeout = &ms( -$RESYNCH_TIMEOUT ); my @keys ; # List of keys to handle # Get a list of keys to check regarding the called mode if ($mode) { # As writer we should also synchronize with DB in the same time my @resynch_keys = keys(%{$resynch_list}) ; &MAXMINSTAT('SYNC-LOCKID-RESYNCH-KEYS-'.$lockid,scalar(@resynch_keys)); @keys = grep { $resynch_list->{$_} < $timeout } @resynch_keys ; } else { # Search for status to synchronize with DB # Get also new status from shared list @keys = ( &sync_new_list($lockid), &sync_list( $lockid ) ) ; } my @too_quick = () ; # Set a counter to limit processing to not overload the service # But we should process at least 10% of the list in case of big load my $long_processing = 0 ; my $max_long_processing = @keys > 256 ? @keys >> 4 : 16 ; while ( @keys ) { my $key = shift @keys ; $status_count ++ ; # Get the status object or continue my $status = new A2P::Status( \%STATUS, \%TIED_STATUS, $key ) or next ; # Skip if not a status next unless (&is_status($status)); # Keep this status in cache for a while if we are searching for if ( $search_status and ( $status->is_job eq $search_status or $status->{JOBID} eq $search_status or $status->{AFP} eq $search_status )) { $status->cached ; $search_status = '' ; } # Don't recheck status too quickly &UPSTAT('SYNC-LOCKID-QUICK-CHECK-DB-SYNCH-'.$lockid); if ( ! $mode and $status->checking_too_quickly() ) { push @too_quick, $key if ($status->dirty_bit(3)); &UPSTAT('SYNC-LOCKID-TOO-QUICK-DB-SYNCH-'.$lockid); next ; } # Remark: The dirty bit 3 should only be reset when every steps are # finished to guaranty the status is in DB # Synchronize status with DB if ($status->sync_with_db) { &UPSTAT('SYNC-LOCKID-DB-SYNCHED-'.$lockid); if ($status->is_finished) { &UPSTAT('SYNC-LOCKID-FINISHED-'.$lockid.'-'.$mode); if ($mode) { &Debug("Synchro $lockid to DB for $key completed") if ($ADVANCED_DEBUGGING); &UPSTAT('SYNC-LOCKID-DB-SYNCH-COMPLETED-'.$lockid); # Will save to tied so reset dirty bit 3 just before $status->unset_dirty_bit(3); # Avoid check completed status } elsif ($status->dirty_bit(3)) { # A resynch is needed if tied timer has reached a time out # This can be seen as an activity delay on this status my $resynch = $timeout - $status->is_tied_timer ; &MAXMINSTAT('SYNC-LOCKID-FINISHED-TIMER-'.$lockid,$resynch); if ( $resynch > 0 ) { $resynch_needed ++ ; $resynch_list->{$key} = $status->is_tied_timer ; } } } else { &Debug("Synchro $lockid to DB for $key done at rev" . $status->db_revision) if ($ADVANCED_DEBUGGING); # Still set it to resynchronize but don't force it now $resynch_list->{$key} = $status->is_tied_timer ; } } else { &UPSTAT('SYNC-LOCKID-BAD-DB-SYNCH-'.$lockid); &Debug("Bad synchronization of $lockid $key status to DB"); } # We should check again this status until finished &in_expected_sync_list( $lockid, $key ) if ($status->dirty_bit(3)); # Be sure to only save when job is really finished if ($mode and $status->is_finished) { # We can update DBM with current SID &Debug("Updating $key in $lockid tied hash with sid " . $status->get_sid_key) if ($ADVANCED_DEBUGGING); unless ($status->save_tied('not a revision')) { &Debug("Can't update tied status $key in $lockid mem hash: $!"); # We shouldn't continue to update now, but keep in mind what we # have to do, so release the ressource very quickly &UPSTAT('SYNC-LOCKID-DELAY-DBM-SYNCH-'.$lockid); last ; } # Update recheck hash delete $resynch_list->{$key}; # Don't keep tied hash too much time so don't write too much status last if ( $status_count > 4 ); } else { # Also control timing when not gdbm writer # last if ( &_micro_timing(1) and $long_processing++ > $max_long_processing ); } } # Inject pending keys &sync_list_inject_again( $lockid, \@keys ) if @keys ; &sync_list_inject_again( $lockid, \@too_quick ) if @too_quick ; &MAXSTAT('SYNC-LOCKID-COUNTED-STATUS',$status_count); &MAXSTAT('SYNC-LOCKID-RESYNCH-NEEDED',$resynch_needed); return $resynch_needed ; } sub sync_dbm_to_db { # This API checks any available jobstatus DBM file and returns a reference # to a hash agregation of any Status object found &TIMESTAT('SYNC-DBM-TO-DB'); &UPSTAT('SYNC-DBM-TO-DB'); my %DBMS = () ; my %recheck = () ; # Get the list of available DBM file with the current base # The list is a hash which LOCKIDs as keys my $base = &dbm_base ; my $base_re = qr/^$base(.+)$/ ; %DBMS = map { $_ =~ $base_re ; $1 => 1 } grep { ! -d $_ } grep { ! /\.LCK$/ } glob( $base . '*' ) ; # Initialize timing check to not overload the computer &_micro_timing( 0, $USLEEP >> 1 ); # MAXTIME = USLEEP / 2 my $sync_count = scalar(keys(%DBMS)); my $retries = -1 ; # 2. Read each files and updates our agregation hash while (( $sync_count or keys(%recheck)) and $retries++ < 10 ) { my @lockids = grep { $DBMS{$_} } keys(%DBMS) ; push @lockids, keys(%recheck) unless ( @lockids and $retries < 10 ); # Leave loop when nothing's to do last unless (@lockids); foreach my $lockid ( @lockids ) { &UPSTAT('SYNC-DBM-LOCKID-'.$lockid); # Check to reopen gdbm as writer my $db_mode = 0 ; if (exists($recheck{$lockid})) { &UPSTAT('SYNC-DBM-AS-WRITER-'.$lockid); $db_mode = 1 ; delete $recheck{$lockid} ; } # Get the lock on tie file updating %TIED_STATUS reference my $is_tied = &tie_status_hash($lockid, $db_mode) ; # Check we got expected mode unless ( $db_mode == $CURRENTMODE ) { &UPSTAT('SYNC-DBM-LOCKID-'.$lockid.'-ERROR-MODE-'.$db_mode); next ; } # Really re-sync if tied if (defined($is_tied)) { &TIMESTAT('SYNC-DBM-LOCKID-'.$lockid.'-'.$db_mode); $DBMS{$lockid} = 0 ; $sync_count -- ; $recheck{$lockid} = 1 if (&sync_lockid( $lockid, $db_mode )); # Untie the hash after check to clean it undef $is_tied ; untie %TIED_STATUS ; &TIMESTAT('SYNC-DBM-LOCKID-'.$lockid.'-'.$db_mode); # Try to clean some entries in tied hash &_tiehash_cleaner($lockid) ; } # Anyway unlock the dbm file &dbm_unlock($lockid); } # Sleep a little if we couldn't be able to read a GDBM as other process # can access it, unless resynch is needed if ( $sync_count > 0 ) { &Debug("Sleeping as some status has not been kept"); &UPSTAT('SYNC-DBM-BAD-SYNC-COUNT'); usleep $USLEEP ; } } # Compute few service statistics &MAXSTAT('SYNC-DBM-RETRY-SYNC',$retries) if ($retries); &TIMESTAT('SYNC-DBM-TO-DB'); # Then clean our cache &cache_cleaner(); return $sync_count ; } my @micro_timer = ( 0, 0, 0 ) ; sub _micro_timing { my $flag = shift ; my @time = &gettimeofday() ; # Get current timer my $time = int( $time[0] * 1000000 + $time[1] ) ; # First call with zero flag make timer initialization if ($flag) { my $delta = $time - $micro_timer[0] ; # Exit if timer is not reached return 1 unless ( $delta > $micro_timer[2]); &MAXSTAT($LOCKID.'-MICRO-TIMING-ON-DELTA',$delta); &UPSTAT($LOCKID.'-USLEEP-'.$micro_timer[2].'-IN-MICRO-TIMING'); return usleep 1000 && 0 unless ( $time < $micro_timer[1] ); usleep $micro_timer[2] ; } else { # USLEEP * 4 should not be reached, set this as timeout $micro_timer[1] = $time + $USLEEP << 2 ; # Second argument is then the maxtime to sleep in later calls $micro_timer[2] = shift || $USLEEP ; } return $micro_timer[0] = $time ; # Update timer } sub getstatus_from_db { # This API get each jobstatus found in DB # It populates the local cache hash and returns a reference to this hash # Used from service-lib.pl in afp2print Webmin module # Some filters can be passed my $lockid_filter = shift || "" ; # Next are references my $jname_filter = shift || "" ; # Can be a ref regex my $day_filter = shift || "" ; my $status_filter = shift || "" ; # Can be a ref regex my $sql = shift || "" ; # Can be a hash ref unless (@_==3) { return &Error("DBI configuration not provided with: '@_'"); } # Update DBI access configuration $DBI_DSN = shift ; $DBI_USER = shift ; $DBI_PASSWD = shift ; # Can be better called with a call-back my $filter = ref($lockid_filter) =~ /^CODE$/ ? $lockid_filter : sub { 0 }; $lockid_filter = "" if (&$filter); my %filtered = () ; my @filter = $filter ? () : (\$jname_filter,\$day_filter,\$status_filter); my $status = new A2P::Status( \%STATUS, \%TIED_STATUS ) ; # Remove this first from cache as it is just an API accessor $status->remove_from_cache ; while (defined($status) and $status and $status->get_next_from_db($sql)) { next unless $status->is_job ; # Insert a copy in the cache for our JOB name $STATUS{$status->is_job} = $status->clone unless ((@filter and $status->is_filtered(@filter)) or (&$filter and &$filter($status))); } # Don't keep this entry as not a status delete $STATUS{__SID__} ; return \%STATUS ; } sub is_status { return 0 unless @_ ; # Check if argument is an A2P::Status object return ref($_[0]) =~ /^A2P::Status$/ ? 1 : 0 ; } sub is_tied_status { return 0 unless @_ ; &TIMESTAT('IS-TIED-STATUS?'); # Check if key/value designed by my $key = shift ; my $is_status = 0 ; if ( $key =~ /_rev$/ or exists($TIED_STATUS{$TIED_STATUS{$key}}) or ( $TIED_STATUS{$key} =~ /^\d+$/ and exists($TIED_STATUS{'_SID_'.$TIED_STATUS{$key}}) ) ) { # Return false if key is a rev key # Check also if its a reference to another key and then return false &UPSTAT('IS-TIED-STATUS-NO'); } elsif ( $key =~ /^_SID_\d+$/ or $TIED_STATUS{$key} =~ _freezed_status_re ) { $is_status = 1 ; &UPSTAT('IS-TIED-STATUS-YES'); } else { &UPSTAT('IS-TIED-STATUS-NO-2'); } return $is_status ; } # Private hash to help clean tiehashs from a2p-status my %th_cleaner = ( TIMER => {}, LIST => {} ) ; my %lost = () ; sub _tiehash_cleaner { # Still return if MAXAGE is not set (dangerous) return 0 unless ( $STATUS_MAXAGE ) ; my $lockid = shift ; # Get the current processed list my $list = exists($th_cleaner{LIST}->{$lockid}) ? $th_cleaner{LIST}->{$lockid} : [] ; # Return if list is empty and timer list is lower than 5 minutes, but do a # clean on the first call return 0 if ( ! @{$list} and exists($th_cleaner{TIMER}->{$lockid}) and &tv_interval($th_cleaner{TIMER}->{$lockid}) < 300 ); &UPSTAT('TH-CLEANER-API-'.$lockid); unless (@{$list}) { # Try to access tiehash as reader when preparing the list return 0 unless (defined(&tie_status_hash($lockid, 0))) ; &TIMESTAT('TH-CLEANER-INIT-'.$lockid); &UPSTAT('TH-CLEANER-INIT-API-'.$lockid); # Update timer now $th_cleaner{TIMER}->{$lockid} = [ &gettimeofday() ]; # Initialize hash keys list, being sure to only have status keys $list = [ grep { &is_tied_status($_) } keys(%TIED_STATUS) ] ; $th_cleaner{LIST}->{$lockid} = $list ; &StatusDebug(1); # This value indicates which $MAX_CACHED_STATUS we should set as conf &MAXSTAT('TH-CLEANER-INIT-COUNT-'.$lockid,scalar(@{$list})); &TIMESTAT('TH-CLEANER-INIT-'.$lockid); untie %TIED_STATUS ; # Still return anyway return 0 ; } &TIMESTAT('TH-CLEANER-'.$lockid); # Try to access tiehash as writer &tie_status_hash($lockid, 1) ; # Short cut the processing loop in case we are not writer my $maxcount = &cansavetied() ? 10 : 0 ; # Only process a limited count of jobs by call my $count = 0 ; while ( @{$list} and $count ++ < $maxcount ) { my $job = shift @{$list} ; # Skip still removed entry next unless (exists($TIED_STATUS{$job})); # Reget this status from/into the cache my $status = new A2P::Status( \%STATUS, \%TIED_STATUS , $job ) or next ; # Check the age &MAXSTAT('TH-CLEANER-STATUS-REAL-AGE-'.$lockid,$status->real_age); if ( $status->is_done_and_timer_aged ) { &StatusDebug("Cleaning $job in tied hash"); $status->remove_from_cache ; $status->remove_from_tied_hash ; &UPSTAT('TH-CLEANER-STATUS-CLEANED-'.$lockid); } } &TIMESTAT('TH-CLEANER-'.$lockid); # Check also to shrink GDBM some time &_shrink_gdbm_file($lockid) if $maxcount ; # Untie before leaving untie %TIED_STATUS ; 1 ; } my $cache_timer = undef ; # Timer to handle cache check my @cache_checklist = () ; my @cache_ages = () ; my %age_checklist = () ; my $cache_older = 0 ; sub cache_cleaner { # Clean cache object each minute (objects will be reloaded from # tied file if deleted from cache) return 0 if ( ! $STATUS_CACHE_MAXAGE or ( ! @_ and defined($cache_timer) and &tv_interval($cache_timer)< 60 ) # Also return it known max cache age or ( $cache_older and 1000*time-$cache_older<$STATUS_CACHE_MAXAGE )); # Remove still unused keys if test case only map { delete $STATUS{$_} } grep { /^__/ } keys(%STATUS) if @_ ; &UPSTAT('STATUS-OBJECT-CACHE-CLEANER-API'); # Arguments should only used for testing purpose my $max_loops = shift || 10 ; unless (@cache_checklist) { &TIMESTAT('CACHE-CLEANER-INIT'); my @keys = keys(%STATUS) ; &MAXSTAT('CACHE-KEYS-COUNT',scalar(@keys)); # Be sure to only have status keys, and filter duplicate references @cache_checklist = () ; foreach my $job ( grep { &is_status($STATUS{$_}) } @keys ) { push @cache_checklist, $job unless (grep { $STATUS{$job} == $STATUS{$_} } @cache_checklist); } # Prepare a list of ages based on timers if still not done unless (keys(%age_checklist)) { %age_checklist = () ; map { push @{$age_checklist{$STATUS{$_}->timer}}, $_ } @cache_checklist ; # List the ages @cache_ages = sort(keys(%age_checklist)) ; # First is older } # Set now the timer $cache_timer = [ &gettimeofday() ] ; &StatusDebug(3); &TIMESTAT('CACHE-CLEANER-INIT'); # Return anyway return scalar(@cache_checklist) ; } my $status_count = @cache_checklist ; &MAXSTAT('STATUS-OBJECT-KEPT-IN-CACHE',$status_count); if ( $status_count > $MAX_CACHED_STATUS ) { # Manage cache size &UPSTAT('STATUS-OBJECT-MAX-CACHED-REACHED'); &TIMESTAT('CACHE-CLEANER-MAX-REACHED'); $cache_older = shift @cache_ages ; my $limit = abs($MAX_CACHED_STATUS - 10) + 1 ; # Paranoid temp limit while ( defined($cache_older) and $cache_older and $status_count >= $limit ) { my $aged = shift @{$age_checklist{$cache_older}} ; if (defined($aged) and $aged and exists($STATUS{$aged})) { &UPSTAT('CACHED-STATUS-OBJECT-EARLY-CLEANED'); $status_count -- ; $STATUS{$aged}->remove_from_cache ; } unless (@{$age_checklist{$cache_older}}) { # Don't forget to forget that age delete $age_checklist{$cache_older} ; $cache_older = shift @cache_ages ; } } # Filter on existing keys @cache_checklist = grep { exists($STATUS{$_}) } @cache_checklist ; &TIMESTAT('CACHE-CLEANER-MAX-REACHED'); # Return anyway return $status_count ; } &TIMESTAT('CACHE-CLEANER'); my $count = 0 ; # Don't check too much cached values at a time while ( @cache_checklist and $count ++ < $max_loops ) { my $job = shift @cache_checklist ; # Avoid cleaning not job or lost keys next unless (exists($STATUS{$job}) and &is_status($STATUS{$job})); if ( defined($STATUS{$job}->check_cached_aged) ) { &UPSTAT('CACHED-STATUS-OBJECT-KEPT'); } else { &UPSTAT('CACHED-STATUS-OBJECT-CLEANED'); } } &MAXSTAT('CACHED-STATUS-OBJECT-CHECKED',$count); &TIMESTAT('CACHE-CLEANER'); return $count ; # Only used in tests } my @pending = () ; sub tied_update_status { # Can be called without argument to compute any pending status return unless (@_ or @pending); my $Job = shift ; my $Step = shift ; my $Status = shift ; my $Infos = shift || {} ; push @pending , [ $Job, $Step, $Status, $Infos ] if (defined($Job)); # 1. We need to tie our hash to GDBM file before continuing my $retries = 0 ; my $timeout = [ &gettimeofday() ] ; $! = 0 ; while (!(defined(&tie_status_hash($LOCKID)) and &cansavetied)) { ($! == 11)? &UPSTAT('BUSY-TIED-HASH'):&Warn("Can't lock GDBM file: $!"); $! = 0 ; &dbm_unlock(); tied(%TIED_STATUS) and untie %TIED_STATUS ; &Warn("Can't unlock GDBM file: $!") if $! ; ++ $retries and &MAXSTAT('FORCED-UPDATE-STATUS-RETRY',$retries); if ( &tv_interval($timeout) > 60 ) { &UPSTAT('HASH-NOT-TIED-AFTER-60s'); last ; } usleep ( $retries < 5 ? $USLEEP >> 1 : ($retries < 10 ? $retries * $USLEEP : 10 * $USLEEP )); $! = 0 ; } while (@pending) { my $ref = shift @pending ; ( $Job, $Step, $Status, $Infos ) = @{$ref} ; # Strip any number at the end of job name $Job =~ s/-\d+$// ; # 2. Get the status object my $jobstatus = new A2P::Status( \%STATUS, \%TIED_STATUS , $Job ) ; unless (defined($jobstatus) and $jobstatus) { &Warn("Can't update Job status of $Job"); unshift @pending, $ref ; last ; } # 3. Updating object status &StatusDebug(4); &Warn("$Job status at step $Step to $Status not updated") unless ( $jobstatus->step_status($Step,$Status) ); # 4. Update object with infos &StatusDebug(5); $jobstatus->infos($Infos); # 5. Update object timer to reset its cache age $jobstatus->cached ; # 6. Saving tied objects &StatusDebug(6); $jobstatus->save_tied ; } # 7. Then we can untie our hash if really tied &StatusDebug(7); untie %TIED_STATUS ; # 8. Also unlock DBM file if locked &dbm_unlock(); # Keep internal stats if (@pending) { &DebugPendingQueue(); $STATS{'PENDING-JOBSTATUS'} = join(';',map { "@{$_}" } @pending) ; } elsif (exists($STATS{'PENDING-JOBSTATUS'})) { &MAXSTAT('PENDING-JOBSTATUS-COUNT',scalar(@pending)); delete $STATS{'PENDING-JOBSTATUS'} ; } return \%TIED_STATUS ; } my $debug_pending_timer ; sub DebugPendingQueue { return unless ($ADVANCED_DEBUGGING); # Handle timer, debug only at max each second return unless (defined($debug_pending_timer) and &tv_interval($debug_pending_timer) > 1); $debug_pending_timer = [ &gettimeofday() ] ; my %pendings = () ; # Analyse list map { my ( $n, $s, $S, $I ) = @{$_} ; $pendings{$n} = { STEPMIN => 12 , STEPMAX => 0 , STATUS => '' } unless (exists($pendings{$n})); $pendings{$n}->{STEPMIN} = $s if ( $s < $pendings{$n}->{STEPMIN}); $pendings{$n}->{STEPMAX} = $s if ( $s > $pendings{$n}->{STEPMAX}); $pendings{$n}->{STEP} = $s, $pendings{$n}->{STATUS} = $S if ( $S eq 'A' or ( $pendings{$n}->{STATUS} ne 'A' and $pendings{$n}->{STATUS} ne 'o' )); } @pending ; my @pendings = keys(%pendings) ; &StatusDebug(@pendings." pending status updates"); my @abterms = grep { $pendings{$_}->{STATUS} eq 'A' } @pendings ; if (@abterms) { &StatusDebug(@abterms." pending ABTERM status update (@abterms)"); map { &StatusDebug("$_ ABTERM: status steps: ".$pendings{$_}->{STEPMIN}. " - ".$pendings{$_}->{STEPMAX}.(exists($pendings{$_}->{STEP})? " ; ABTERM Step: ".$pendings{$_}->{STEP} : "")); delete $pendings{$_} ; } @abterms ; } # Get again list after ABTERM keys deletion @pendings = keys(%pendings) ; if (@pendings) { &StatusDebug(@pendings." pending ok status update:"); map { &StatusDebug("$_: status steps: ".$pendings{$_}->{STEPMIN}. " - ".$pendings{$_}->{STEPMAX}); } @abterms ; } } END { # Try to purge pending status updates &tied_update_status ; if ($ADVANCED_DEBUGGING) { foreach my $mesg (keys(%MESG)) { next unless ( $MESG{$mesg}->[0] > 1 ); $MESG{$mesg}->[0] -- ; $MESG{$mesg}->[1] = [ 0, 0 ]; &StatusDebug($mesg); } } # Save in_expected sync lists in SharedList lists foreach my $lockid (keys(%{$sync_list[1]})) { my @expected = keys(%{$sync_list[1]->{$lockid}}); my $shared = &SharedList( $lockid, 'forced' ); push @{$shared}, @expected if ( ref($shared) =~ /^ARRAY/i ); &FreeSharedList( $lockid ); } } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;