source: A2P/a2p/A2P/Status.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 34.4 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: Status.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22# Class to implement one job status object handling its tied version
23#
24
25package A2P::Status ;
26
27use strict;
28use Storable qw( freeze thaw );
29use Time::HiRes  qw( usleep );
30use A2P::Globals ;
31use A2P::Syslog ;
32use A2P::Tools qw( SharedList FreeSharedList ms );
33use A2P::DB qw( job_status_update is_job_status_updated get_next_jobstatus
34                job_status_delete );
35
36BEGIN {
37    our $VERSION = sprintf "%s", q$Rev: 1158 $ =~ /(\d[0-9.]+)\s+/ ;
38}
39our $VERSION ;
40
41# Set initial status constant to default array at compilation
42sub INTIAL_STATUS { split(//,'o__00000000__') }
43
44my $CACHE   = {} ;    # Status object cache
45my $TIEHASH = {} ;    # Tied hash
46my $DB      = {} ;    # Status DB informations
47my $SID_INDEX = 0 ;   # Index to initialize status unique ID, only in a2p-status
48my %MESG    = () ;    # Used in StatusDebug to avoid same messages
49
50### Private members related to SID management ###
51# Reference of sid cache list
52sub SID_CACHE { defined($CACHE->{__SID__}) ? $CACHE->{__SID__} :
53    ( $CACHE->{__SID__} = {} ) }
54# Only call _get_sid_tied_key when we are sure first arg is a number
55sub _get_sid_tied_key { '_SID_' . (@_ ? $_[0] : "" ) }
56my $MATCH_SID_TIED_KEY = qr/^_SID_(\d+)$/ ;
57# Revision keys extension
58sub _rev_ext    {   '_rev'  }
59sub _rev_ext_re { qr/_rev$/ }
60#################################################
61
62sub _is_tie_writer {
63    # Status object must not be used before this key is set in cache
64    return ( tied(%$TIEHASH) and defined($CACHE->{__GDBM_WRITER__})
65        and $CACHE->{__GDBM_WRITER__} );
66}
67
68sub new {
69    my $class = shift ;
70    my $self  = undef ;
71
72    # Return if CACHE and TIEHASH are not hash
73    return $self
74        unless (ref($_[0]) =~ /^HASH/ and ref($_[1]) =~ /^HASH/);
75
76    &TIMESTAT('STATUS-GET-NEW');
77
78    $CACHE   = shift ;
79    $TIEHASH = shift ;
80    my $job  = shift || 'noname' ;
81
82    # Try to get the status from CACHE
83    if (defined(SID_CACHE->{$job})) {
84        $self = SID_CACHE->{$job}->check_cached_aged ;
85
86    } elsif (defined($CACHE->{$job})) {
87        $self = $CACHE->{$job}->check_cached_aged ;
88    }
89
90    unless (defined($self)) {
91        # Job not in cache, prepare to check tied hash by constructing a
92        # minimal object
93        $self = { JOB => $job, INITIAL => $job } ;
94        bless $self , $class ;
95        $self->_set_defaults ;
96    }
97
98    $self->is_job($job);
99
100    # Get ourself from tied hash if I was updated in tied hash
101    $self = $self->compared_with_one_from_tiehash ;
102
103    # Keep references in cache when not cached
104    $self->cached unless ($self->is_cached);
105
106    # Check SID: Set to 0 by default unless in a2p-status
107    $self->set_sid_index(&_found_sid) unless (defined($self->get_sid_index));
108
109    # Delete initial tag for newly created object used as ident when destroyed
110    if (exists($self->{INITIAL})) {
111        #&StatusDebug("new Status object v$VERSION created for $job");
112        delete $self->{INITIAL} ;
113    }
114
115    &TIMESTAT('STATUS-GET-NEW');
116    return $self ;
117}
118
119sub _set_defaults {
120    # Be careful, some defaults must only be set the first time
121    my $self = shift ;
122    $self->{JOB}    = "noname"      unless (defined($self->{JOB}));
123    $self->{AFP}    = $self->is_job unless (defined($self->{AFP}));
124    $self->{JOBID}  = ""            unless (defined($self->{JOBID}));
125    $self->{STEP}   = 2  ; # We are still at step when jobstatus is created
126    $self->{STATE}  = [ INTIAL_STATUS ] ;
127    $self->{LOCKID} = $LOCKID ;
128    $self->{DAY}    = 19700101 ;
129    $self->{BORN}   = 0             unless (exists($self->{BORN}));
130    $self->{REV}    = 0             unless (defined($self->{REV}));
131    $self->{TIMER}  = 0             unless (defined($self->{TIMER}));
132    map { $self->unset_dirty_bit($_) } 1..16 ; # Reset dirty bits
133}
134
135sub set_sid_index { # SID must set as a number
136    my $self = shift ;
137    my $sid  = shift ;
138    if (defined($sid) and $sid =~ /^\d+$/) {
139        $self->{SID} = $sid ;
140
141        # Update our predictive internal SID index counter
142        &_found_sid( $sid );
143
144    } elsif (exists($self->{SID})) {
145        delete $self->{SID} ;
146    }
147}
148
149sub get_sid_index { # Must return a number as SID
150    return defined($_[0]->{SID}) ? $_[0]->{SID} : 0 ;
151}
152
153sub get_sid_key {
154    # Return an unique SID key for the tied hash
155    my $self = shift ;
156    return ( $_[0] =~ /^\d+$/ ) ? _get_sid_tied_key($_[0]) : $_[0]
157        if (@_ and $_[0]);
158    # self->AFP is the default SID key
159    return $self->get_sid_index ?
160        _get_sid_tied_key($self->get_sid_index) : ( shift || $self->{AFP} ) ;
161}
162
163sub get_sid_version_key {
164    return (defined($_[1]) ?
165        ( $_[1] =~ /^\d+$/ ? $_[0]->get_sid_key($_[1]) : $_[1] ) :
166        $_[0]->get_sid_key ) . _rev_ext ;
167}
168
169sub _is_revision_key {
170    return $_[0] =~ _rev_ext_re ;
171}
172
173sub _found_sid {
174     # API only useful in a2p-status
175    return ( $_[0] > $SID_INDEX ? $SID_INDEX = shift : shift )
176        if (@_ and $_[0] =~ /^\d+$/);
177    return 0 unless ($SID_INDEX);
178    $SID_INDEX ++ ;
179    return (defined(SID_CACHE->{$SID_INDEX})
180        or defined($TIEHASH->{_get_sid_tied_key($SID_INDEX)})) ?
181        &_found_sid : $SID_INDEX ;
182}
183
184sub current_in_tie_sid_key {
185    my $self = shift ;
186    my $sid  = $self->is_job ;
187    my $key  = 0 ;
188    my %REFS = () ; # Reference counters to avoid infinite loops
189
190    # Retrieve the tiehash SID as a number (it is 0 if not a SID number)
191    while ($sid and $sid !~ /^\d+$/ and
192    (!defined($REFS{$sid}) or $REFS{$sid} < 2)) {
193        if ($sid =~ $MATCH_SID_TIED_KEY) {
194            # Extract SID from key
195            $sid = $1 ;
196
197        } else {
198            $REFS{$sid} ++ ; # Update reference counter to avoid infinite loops
199            # SID is a reference to another key or SID is invalid
200            if (defined($TIEHASH->{$sid})) {
201                if (defined($TIEHASH->{$self->get_sid_version_key($sid)})) {
202                    $key = $sid ;
203                    $sid = 0 ;
204                    last ;
205
206                } else {
207                    $sid = $TIEHASH->{$sid} ;
208                }
209
210            } else {
211                $sid = 0 ;
212            }
213        }
214    }
215
216    $key = $self->get_sid_key($sid) unless ($key);
217
218    # If not a numbered SID, we will retrieve from jobname
219    #&StatusDebug("Status SID for ".$self->is_job." has changed".
220    #    ($self->get_sid_index?" from ".$self->get_sid_index:"")." to $sid")
221    #    if ($sid and $self->get_sid_index != $sid);
222
223    return $key ;
224}
225
226sub compared_with_one_from_tiehash {
227    my $self = shift ;
228    my $job  = $self->is_job ;
229
230    # Retrieve the key of the object in the tied hash, needed to check version
231    my $sidkey = $self->current_in_tie_sid_key ;
232
233    if (defined($TIEHASH->{$sidkey})) {
234        #&StatusDebug("Checking $sidkey tied version");
235        # 1. The revision from tied hash can be a newer version
236        &TIMESTAT('STATUS-GET-COMPARED');
237        my $older = $self->is_older_than_tied($sidkey);
238        &TIMESTAT('STATUS-GET-COMPARED');
239        if ( $older ) {
240            &TIMESTAT('STATUS-GET-TIED');
241            &UPSTAT('HIT-TIED-STATUS');
242
243            # is_tied API must return 0 or a A2P::Status object
244            my $newself = $self->is_tied($sidkey);
245            if (defined($newself) and $newself) {
246                if ($newself != $self) {
247                    #&StatusDebug("Replacing $job by tied $sidkey (" .
248                    #    $newself->is_job . ") at rev. " .
249                    #    $newself->get_revision );
250
251                    # Don't forget to update it with current job name
252                    $newself->is_job($job);
253                    $self = $newself ;
254                }
255
256            } else {
257                &StatusDebug("Can't get $job Status from local file");
258            }
259            &TIMESTAT('STATUS-GET-TIED');
260
261        # 2. Our cached version is newer revision, we still got it
262        } else {
263            &UPSTAT('HIT-CACHED-STATUS');
264            #&StatusDebug("Keep cached '$job' status from memory at rev. "
265            #    . $self->get_revision);
266        }
267    }
268
269    return $self ;
270}
271
272sub is_done_and_timer_aged {
273    my $self = shift ;
274
275    # Only set the status is aged when parameter is set and status is DONE and
276    # the dirty bit 3 is not set (not synched in DB by a2p-status)
277    return 0 unless ( $STATUS_MAXAGE and $self->is_done
278        and ! $self->dirty_bit(3));
279
280    # Get the timer based age
281    my $age = $self->is_tied_timer ? &ms() - $self->is_tied_timer : 0 ;
282    &MAXSTAT("STATUS-TIMER-AGE",$age);
283    return $age > $STATUS_MAXAGE ? 1 : 0 ;
284}
285
286sub _born {
287    my $self = shift ;
288
289    # Set day status
290    my @time = localtime(time);
291    $self->{DAY} = ($time[5]%100 + 2000)*10000 + ($time[4]+1) *100 + $time[3] ;
292
293    # Set born time to the current millisecond from the epoch
294    $self->{BORN} = &ms() unless (defined($self->{BORN}) and $self->{BORN});
295}
296
297sub cached {
298    &TIMESTAT('STATUS-OBJECT-CACHED');
299    my $self = shift ;
300
301    # Update every important key in cache
302    map {
303        $CACHE->{$self->{$_}} = $self
304        } grep {
305            defined($self->{$_}) and $self->{$_}
306            } qw{
307                JOB JOBID AFP
308    };
309
310    # Check jobs reference in cache
311    my @jobs = defined($self->{JOBS}) ? @{$self->{JOBS}} : ();
312    map { $CACHE->{$_} = $self } grep { defined } @jobs ;
313
314    # Check SID usable in CACHE
315    SID_CACHE->{$self->get_sid_index} = $self
316        if $self->get_sid_index ;
317
318    # Set status is cached
319    $self->cache_timer ;
320    $self->is_cached(1);
321
322    &TIMESTAT('STATUS-OBJECT-CACHED');
323    1 ;
324}
325
326sub cache_timer {
327    # Update the cache timer
328    $_[0]->{TIMER} = &ms() ;
329}
330
331sub timer {
332    return $_[0]->{TIMER} || 0 ;
333}
334
335sub is_cached {
336    my $self = shift ;
337    return @_ ? ((defined($_[0]) and $_[0]) ?
338        $self->set_dirty_bit(10) : $self->unset_dirty_bit(10))
339        : $self->dirty_bit(10) ;
340}
341
342############## Dirty bits ############################ Dirty bits ##############
343# 1 -> Dirty cache: Cached status must be written to tied status hash
344# 2 -> Cache synched with DB, need to written new version in tied status hash
345# 3 -> Status is new or is not done in DB -> can't be removed from tied hash
346# 4 -> Cache and hash are synched: versioning can be updated in DB
347############## Dirty bits ############################ Dirty bits ##############
348sub dirty_bit {
349    my $self = shift ;
350    my $set  = shift || 1 ;
351    &StatusDebug("SET not defined") unless (defined($set));
352    if (@_) {
353        $self->{DIRTY} = 0 unless (exists($self->{DIRTY}));
354        vec($self->{DIRTY},$set,1) = (defined($_[0]) and $_[0])? 1 : 0 ;
355    }
356    return exists($self->{DIRTY}) ? vec($self->{DIRTY},$set,1) : 0 ;
357}
358
359sub set_dirty_bit {
360    # Set a bit with birty bit API
361    my $self = shift ;
362    my $bit  = shift || 1 ;
363    return $self->dirty_bit( $bit, 1 );
364}
365
366sub unset_dirty_bit {
367    # Unset a bit with birty bit API
368    my $self = shift ;
369    my $bit  = shift || 1 ;
370    return $self->dirty_bit( $bit, 0 );
371}
372
373sub get_cached_age {
374    return $_[0]->timer ? &ms() - $_[0]->timer : 0
375}
376
377sub check_cached_aged {
378    my $self = shift ;
379    # Check if our cache is too aged
380    if ($STATUS_CACHE_MAXAGE and defined($self)) {
381        my $age = $self->get_cached_age ;
382        if ( $age > $STATUS_CACHE_MAXAGE ) {
383            &MAXSTAT('STATUS-OBJECT-CACHE-AGE',$age);
384            &UPSTAT('CACHED-STATUS-OBJECT-CLEANED');
385            $self->remove_from_cache ;
386            $self = undef ;
387
388        } else {
389            # Set status is cached
390            $self->is_cached(1);
391        }
392    }
393    return $self ;
394}
395
396sub real_age {
397    return defined($_[0]->{BORN}) ? &ms() - $_[0]->{BORN} : 0 ;
398}
399
400sub StatusDebug {
401    return 1 unless $ADVANCED_DEBUGGING ;
402    my $add = "" ;
403    if ($0 !~ /^TEST/) {
404        if (defined($MESG{$_[0]})
405        and my ( $repeat, $timeout ) = @{$MESG{$_[0]}}) {
406            $MESG{$_[0]}->[0] ++ ;
407            return 1 unless ( &ms() >= $timeout );
408            $add = " ($repeat times)" unless ($repeat==1);
409
410        } else {
411            $MESG{$_[0]} = [ 1 ] ;
412        }
413    }
414    $MESG{$_[0]}->[1] = &ms(60) ; # Keep time out in one minute
415    my ($package, $filename, $line) = caller ;
416    &Debug( map { "at L.$line: " .$_ .$add } @_ );
417}
418
419sub get_lockid {
420    return defined($_[0]->{LOCKID}) ? $_[0]->{LOCKID} : $LOCKID ;
421}
422
423sub _format_info_to_keep {
424    my $this  = shift ;
425    my $key   = shift ;
426    my $value = shift ;
427
428    # Only handle the special case 'ABTERM'
429    return $value
430        unless ($key =~ /^ABTERM|ERRORS$/);
431
432    # Manage ABTERM case as growing array stripping ABTERM string
433    my $ref = ref($this->{$key}) =~ /^ARRAY/i ? $this->{$key} : [] ;
434    $value =~ s/^ABTERM[:]*\s*// ;
435    push @{$ref} , $value ;
436
437    return $ref ;
438}
439
440sub infos {
441    my $self  = shift ;
442    my $Infos = shift || {} ;
443
444    # Check $Infos is hash ref
445    return &Debug("Can't handle such information format ($Infos)") && 0
446        unless ( ref($Infos) =~ /^HASH/i );
447
448    my $tmpref = $self ;
449
450    # When called with a JID, we will have specific informations
451    if (defined($Infos->{JID})) {
452        # Update what we are for next informations
453        $self->{$Infos->{JID}} = {} unless (defined($self->{$Infos->{JID}}));
454        $tmpref = $self->{$Infos->{JID}} ;
455        $self->is_job($Infos->{JID});
456        delete $Infos->{JID} ;
457    }
458
459    while ( my ( $key, $info ) = each(%{$Infos}) ) {
460
461        $info = "" unless (defined($info));
462
463        # Strip not cesure chars and strip strings
464        $info =~ s|[^ .0-9A-Za-z%_/-]||g ;
465        $info =~ s/^\s+// ;
466        $info =~ s/\s+$// ;
467
468        $tmpref->{$key} = _format_info_to_keep($tmpref, $key, $info) ;
469    }
470
471    # Also auto update
472    $self->_auto_update ;
473
474    1 ;
475}
476
477sub _auto_update {
478    my $self = shift ;
479
480    # Check to update STATUS to ABTERM if ABTERM defined and STATUS is empty
481    $self->{STATUS} = 'ABTERM'
482        if (defined($self->{ABTERM}) and $self->{ABTERM}
483        # Control we are not trying to validate the job
484        and ( ref($self->{ABTERM}) !~ /^ARRAY/i
485            or $self->{ABTERM}->[$#{$self->{ABTERM}}] !~ /^validated/i )
486        and ( ! defined($self->{STATUS}) or ! $self->is_abterm ));
487
488    # Check job count
489    if (my ( $number ) = $self->{JOB} =~ /-(\d+)$/) {
490        # Update jobs index list with jid hash ref
491        $self->{JOBS}->[$number] = $self->is_job ;
492
493        $self->{NBJOBS} = $number
494            if (!defined($self->{NBJOBS}) or $self->{NBJOBS} < $number );
495    }
496}
497
498my $ko_status = qr/^ko|abterm$/i ;
499sub is_abterm {
500    return $_[0]->{STATUS} =~ $ko_status ;
501}
502
503my $done_status = qr/^DONE$/i ;
504sub is_done {
505    return $_[0]->{STATUS} =~ $done_status ;
506}
507
508sub is_filtered {
509    # Need to return 1 when not matching filters, filters are references
510    my $self = shift ;
511    my $job_filter = shift ;
512    my $day_filter = shift ;
513    my $status_filter = shift ;
514
515    return &Error("Unsupported filtering mode")
516        unless ( grep {
517            ref($_) =~ /^SCALAR/ } $day_filter, $job_filter, $status_filter
518            == 3 ) ;
519
520    # Filter on the day is done only
521    return 1
522        if ($$day_filter and ! defined($self->{'ABTERM'})
523            and $self->is_done and $self->{'DAY'} !~ /^$$day_filter$/);
524
525    # Filter on status
526    if ( $$status_filter ) {
527        my ( $not , $filter ) = $$status_filter =~ /^(!?)(.*)$/ ;
528        if ($filter = qr/^$filter$/i) {
529            return 1
530                if (($not and $self->{'STATUS'} =~ $filter) or
531                    (! $not and $self->{'STATUS'} !~ $filter));
532        }
533    }
534
535    return 0 unless ($$job_filter);
536
537    # We are filtering on a regex
538    my $filter = ref($$job_filter) =~ /^regex/i ? $$job_filter : qr/$$job_filter/i ;
539    return ( $self->{'AFP'} !~ $filter and $self->{'JOBID'} !~ $filter );
540}
541
542sub is_finished {
543    return ( $_[0]->{STEP} == 12 and $_[0]->step_status(12) eq 'o' ) ? 1 : 0 ;
544}
545
546sub is_job {
547    my $self = shift ;
548    $self->{JOB} = shift if (@_ and defined($_[0]));
549    return $self->{JOB} ;
550}
551
552sub is_tied_timer {
553    my $self = shift ;
554    $self->{TIED_TIMER} = shift if @_ ;
555    return $self->{TIED_TIMER} || 0 ;
556}
557
558sub is_tied {
559    # Have to update us against tied version
560    my $self = shift ;
561
562    my $sidkey = $_[0] || $self->get_sid_key(@_) ;
563    return 0 unless (exists($TIEHASH->{$sidkey}));
564
565    #&StatusDebug("Controling $sidkey from tied hash");
566    while ($TIEHASH->{$sidkey} =~ /^\d+$/) {
567        &Error("$sidkey sidkey is not status but reference to " .
568            $TIEHASH->{$sidkey});
569
570        my $rev_key = $self->get_sid_version_key($sidkey) ;
571        if (exists($TIEHASH->{$rev_key})) {
572            &Info("$rev_key revision key exists for $sidkey, removing it...");
573            delete $TIEHASH->{$rev_key} ;
574        }
575
576        my $newkey = $self->get_sid_key($TIEHASH->{$sidkey}) ;
577        unless (exists($TIEHASH->{$newkey})) {
578            delete $TIEHASH->{$sidkey} ;
579            return &Error("Bad reference to $sidkey");
580        }
581
582        $sidkey = $newkey ;
583    }
584
585    &TIMESTAT('IS-TIED-API');
586
587    # Work-around to a possible concurrencing case when key is updated in
588    # another process after a split job event, fix a a2p-status service crash
589    # TODO reproduce the case to found a better resolution
590    my $newself = $TIEHASH->{$sidkey} ;
591    {
592        my $tries = 10 ;
593        while ( $newself !~ /^\x04/ and $tries -- ) {
594            usleep $USLEEP ;
595            $newself = $TIEHASH->{$sidkey} ;
596        }
597        $SIG{'__DIE__'} = sub { undef $newself ; } ;
598        $newself = &thaw($newself) if ( $newself =~ /^\x04/ );
599    }
600
601    if (ref($newself) =~ /^A2P::Status/) {
602        #&StatusDebug("Retrieved $sidkey from tied hash");
603
604        # Control known SID
605        $self->set_sid_index( $self->db_sid )
606            unless ( $self->get_sid_index eq $self->db_sid );
607
608    } else {
609        &StatusDebug("Can't retrieved $sidkey as A2P::Status from tied hash");
610        $newself = 0 ;
611    }
612
613    &TIMESTAT('IS-TIED-API');
614
615    return $newself ;
616}
617
618sub remove_from_tied_hash {
619    # Still return if we don't have write access to tied file
620    return unless (_is_tie_writer);
621
622    my $self = shift ;
623    my $sid_key = $self->get_sid_key ;
624
625    # Remove from tied hash if really in tied hash
626    return unless (exists($TIEHASH->{$sid_key}));
627
628    &TIMESTAT('REMOVE-FROM-TIED');
629
630    &StatusDebug("Removing $sid_key from tied hash");
631    delete $TIEHASH->{$sid_key} ;
632    delete $TIEHASH->{$self->get_sid_version_key($sid_key)} ;
633
634    # Remove DB hash entry
635    $self->_DB_remove ;
636
637    # Remove any other reference to us, maybe chained references
638    my %remove_it = ( $sid_key => 1 );
639    my %not_ref_to = () ;
640
641    while ( my ( $key, $value ) = each(%{$TIEHASH}) ) {
642        # Skip well known not reference keys
643        next if ( $key =~ /_rev$/ );
644        my $next = $self->get_sid_key($value) ;
645        if ( $key =~ /^_SID_/ or $value =~ /\x04/
646        or exists($not_ref_to{$next}) or exists($not_ref_to{$key}) ) {
647            $not_ref_to{$key} = 1 ;
648            next ;
649        }
650
651        # Remove this key if it points to a well known reference to sid_key
652        if ( exists($remove_it{$next}) ) {
653            $remove_it{$key} = 1 ;
654
655        } else {
656            # Check chained reference
657            my %list = ( $key => 1 ) ;
658
659            while ( $value ) {
660                $value = $TIEHASH->{$next} || '' ;
661                if ( exists($remove_it{$next}) ) {
662                    map { $remove_it{$_} = 1 } keys(%list) ;
663                    last ;
664
665                } elsif ( $next =~ /^_SID_/ or $value =~ /\x04/
666                or exists($not_ref_to{$value}) ) {
667                    map { $not_ref_to{$_} = 1 } keys(%list) ;
668                    last ;
669                }
670
671                # Avoid looping infinitely on cross reference
672                last if (exists($list{$next}));
673                $list{$next} = 1 ;
674                $next = $self->get_sid_key($value) ;
675            }
676        }
677    }
678
679    # Now delete the found references, removing referenced key
680    delete $remove_it{$sid_key} ;
681    map { delete $TIEHASH->{$_} } keys(%remove_it) ;
682
683    &TIMESTAT('REMOVE-FROM-TIED');
684}
685
686sub remove_from_db {
687    my $self = shift ;
688    # Delete the row and eventually and eventually optimize the table
689    &job_status_delete($self->get_sid_index) if ($self->get_sid_index);
690}
691
692sub remove_from_cache {
693    my $self = shift ;
694
695    # Delete any used key pointing to ourself
696    map {
697        delete $CACHE->{$_}
698
699    } grep { $CACHE->{$_} == $self } keys(%{$CACHE}) ;
700
701    # Check any SID pointing to ourself
702    map {
703        delete SID_CACHE->{$_}
704
705    } grep { SID_CACHE->{$_} == $self } keys(%{&SID_CACHE}) ;
706
707    # Here cache should not have any reference to us, will die here
708    $self->is_cached(0);
709}
710
711sub clone {
712    return &thaw(&freeze($_[0])) ;
713}
714
715sub checked_in_db_timer {
716    my $self = shift ;
717    my @v = @_ ? ( $_[0] ? &ms() : 0 ) : () ;
718    return @v ? $self->_DB_this( 'CHK', 0, @v ) : $self->_DB_this( 'CHK', 0 );
719}
720
721sub check_update_in_db {
722    my $self = shift ;
723    my $sid  = $self->get_sid_index || $self->db_sid ;
724
725    unless ($sid) {
726        # Don't check if no SID available
727        &UPSTAT('STATUS-BAD-CHECK-UPDATE');
728        return 0 ;
729    }
730
731    # Don't recheck in DB too quickly, one time by minute is sufficient
732    return 0 if ($self->checked_in_db_timer > &ms(-60));
733
734    # Update checked in db timer to current time
735    $self->checked_in_db_timer(1);
736
737    my $updated = 0 ;
738    my @row = &is_job_status_updated($self->get_sid_index,$self->get_revision);
739    if (@row) {
740        # Update ourself
741        my @keys =qw(   LOCKID  JOBID  REV  TIMER  STEP
742                        STATE   BORN   DAY   AFP   STATUS  INFOS    );
743        foreach my $key (@keys) {
744            my $value = shift @row ;
745            unless (defined($value)) {
746                &StatusDebug("Bad update on $key value");
747                last ;
748            }
749            if ($key eq 'STATE') {
750                $self->{$key} = [ split(//,$value) ] ;
751
752            } else {
753                $self->{$key} = $value ;
754            }
755            $updated ++ ;
756        }
757        &StatusDebug("Updated $updated values from DB for job ".$self->{JOBID});
758        $self->set_dirty_bit ;
759    }
760    return $updated ;
761}
762
763sub _DB_name {
764    return $_[0]->{JOBID} ;
765}
766
767sub _DB_this {
768    my $self = shift ;
769    my $key  = shift or return 0 ;
770    my $def  = shift || 0 ;
771    my $job  = $self->_DB_name ;
772    $DB->{$job} = {} unless (exists($DB->{$job}));
773    $DB->{$job}->{$key} = shift if ( @_ and defined($_[0]) );
774    return exists($DB->{$job}->{$key}) ? $DB->{$job}->{$key} : $def ;
775}
776
777sub _DB_remove {
778    # Free memory to be called when no more needed. Should be called when the
779    # comparaison object is no more available in tied hash
780    my $self = shift ;
781    &UPSTAT('STATUS-DB-REMOVE');
782    &UPSTAT('STATUS-DB-REMOVE-AGE',$self->real_age);
783    delete $DB->{$self->_DB_name} if (exists($DB->{$self->_DB_name}));
784}
785
786sub db_revision {
787    my $self = shift ;
788    return @_ ? $self->_DB_this( 'REV', 0, @_ ) : $self->_DB_this( 'REV', 0 );
789}
790
791sub db_sid {
792    my $self = shift ;
793    return @_ ? $self->_DB_this( 'SID', 0, @_ ) : $self->_DB_this( 'SID', 0 );
794}
795
796sub db_checked {
797    my $self = shift ;
798    $self->_DB_this( 'SYNC_TIMER', 0, &ms() );
799    return $self->_DB_this( 'CHECKED', 0, $self->_DB_this( 'CHECKED', 0 )+1 );
800}
801
802sub checking_too_quickly {
803    return ( &ms() - $_[0]->_DB_this( 'SYNC_TIMER', 0 ) > 500 ) ? 0 : 1 ;
804}
805
806sub is_newer_than_db {
807    my $self = shift ;
808    return ( $self->db_revision < $self->get_revision ) ? 1 : 0 ;
809}
810
811sub sync_with_db {
812    my $self = shift ;
813    my $job  = $self->is_job ;
814
815    $! = 0 ;
816
817    # Get the current SID for that status
818    my $sid = $self->get_sid_index || $self->db_sid ;
819
820    # We update when we know DB rev is lower than current from cache
821    if ( $self->is_newer_than_db ) {
822        # Prepare row for update
823        my @row = ( $sid ) ;
824        push @row, map { defined($self->{$_}) ? $self->{$_} : "NULL" } qw{
825            LOCKID  JOBID  REV  TIMER  STEP
826            STATE  BORN    DAY    AFP  STATUS NBJOBS
827            };
828
829        # Add DestId list
830        my %destids = () ;
831        my @jobs = defined($self->{JOBS}) ?
832            grep { defined($_) } @{$self->{JOBS}} : () ;
833        map { $destids{uc($self->{$_}->{DESTID})} = 1 }
834            grep { defined($self->{$_}->{DESTID}) } @jobs ;
835        if (exists($destids{NULL})) { # Protect NULL DestID if defined
836            delete $destids{NULL} ;
837            $destids{'(NULL)'} = 1 ;
838        }
839        push @row, join(" ",keys(%destids)) || "none" ;
840
841        push @row, $self->{INFOS} || "" ;
842
843        # Check if update is done and we can continue the process
844        $sid = &job_status_update(@row) ;
845        unless ( $sid and ($sid == $self->db_sid or $self->db_sid($sid))) {
846            &Debug("Can't sync $job with @row to db row sid ".$sid);
847            return 0 ;
848        }
849
850        # Update revision
851        $self->db_revision($self->get_revision);
852
853    } elsif ( ! $self->dirty_bit(3) ) {
854        # Check if ABTERM status was updated by another service, this API do it
855        # at last one time by minute. We don't need to check GOOD status as they
856        # can't be updated to better status
857        &UPSTAT('STATUS-UPDATED-FROM-DB')
858            if ( $self->is_abterm and $self->check_update_in_db );
859    }
860
861    # Update our cache if gotten SID is different, check it as
862    # string as SIDs from standard thread are string by default (afp jobname)
863    if ( $self->get_sid_index ne $self->db_sid ) {
864        $self->set_sid_index( $self->db_sid );
865    }
866
867    # Keep statistics on control
868    &MAXSTAT('STATUS-RESYNCH-CHECKED',$self->db_checked);
869
870    # TODO Find why some time db_revision is greater than get_revision, maybe
871    # because a status update is done by a process on older status in its cache
872    unless ($self->db_revision and $self->db_revision >= $self->get_revision) {
873        &Debug("$job DB revision is " . $self->db_revision . " but ours is " .
874            $self->get_revision);
875        return 0 ;
876    }
877
878    return 1 ;
879}
880
881sub get_next_from_db {
882    my $self = shift ;
883
884    # Get the next row as a hash with column names as keys
885    my $row = &get_next_jobstatus( $self->get_sid_index, @_ ) ;
886
887    return 0 unless (ref($row) =~ /^HASH/);
888
889    # Invalidate ourself with empty JobID
890    $self->{JOBID} = '' ;
891    $self->is_job('INVALID JOB ROW');
892
893    # Update ourself to job in DB values
894    foreach my $key (keys(%{$row})) {
895        if ($key eq 'STATE') {
896            $self->{STATE} = [ split(//,$row->{STATE}) ] ;
897        } else {
898            $self->{$key}  = $row->{$key} ;
899        }
900    }
901
902    # Keep who we are now as valid job
903    $self->is_job($self->{JOBID}) if ($self->{JOBID});
904
905    1 ;
906}
907
908sub set_revision {
909    return ++ $_[0]->{REV}  ;
910}
911
912sub get_revision {
913    return $_[0]->{REV} ;
914}
915
916sub save_tied_versioning {
917    # We assume we can write to tied hash
918    my $self = shift ;
919    my $version = [ $self->get_revision, $self->get_lockid, $self->timer ] ;
920    #&StatusDebug("Saving ".$self->is_job." rev. ".$self->get_revision.
921    #    " versioning at key ".$self->get_sid_version_key(@_));
922    return $TIEHASH->{$self->get_sid_version_key(@_)} = &freeze($version);
923}
924
925sub is_tied_versioning {
926    my $self = shift ;
927    my $key = $self->get_sid_version_key(@_) ;
928      unless (defined($TIEHASH->{$key})) {
929        &StatusDebug("Versioning key $key is lost");
930        return ( 0, $LOCKID, 0 );
931    }
932    my @version = @{&thaw($TIEHASH->{$key})};
933    return @version ;
934}
935
936sub is_older_than_tied {
937    # When comparing with tied version, we should check first the timer when
938    # checking on the same LOCKID assuming this is the same service in the
939    # same place
940    my $self = shift ;
941    my ( $rev, $lockid, $timer ) = $self->is_tied_versioning(@_) ;
942
943    # Set ourself as dirty cached status if we are newer than tied version
944    $self->set_dirty_bit if ($self->get_revision > $rev);
945
946    #&StatusDebug("Comparing to @_ tied rev $rev, lockid $lockid, timer $timer");
947    return (( $LOCKID =~ /^$lockid$/ and $self->timer < $timer ) or
948        $self->get_revision < $rev ) ? $rev : 0 ;
949}
950
951sub status {
952    my $self = shift ;
953    return $self->{STATUS} = defined($self->{STATUS}) ? uc($self->{STATUS}) : "" ;
954}
955
956sub step_status {
957    my $self = shift ;
958
959    # Remove any undefined value, arise when starting a job
960    while (@_ and !defined($_[0])) { shift }
961
962    my $Step = shift || 0 ;
963
964    # Check step value to avoid updating job status on unused step
965    return 0 if ($Step < 0 or $Step > 12 );
966
967    if (@_) {
968        # Set step and later keep us in cache
969        my $State = shift || '.' ;
970
971        # Check we can update status
972        if ( $Step < 3 and $State =~ /^[._]$/ ) {
973            my $prev = $self->{STATE}->[$Step-1] || '0' ;
974            # Step starting, previous step must be 'o', '0', '_', '-' or 'A'
975            return 0 unless ( $prev =~ /^[0Ao_-]$/ );
976
977        } elsif ( $Step == 1 and $State eq 'o' ) {
978            # First call initializing a basic job status entry. This is
979            # done the first time in SpoolManager when processing AFP file
980            my $last = $self->{STATE}->[1] ; # Check job has still been started
981            my $restarted = (defined($last) and $last ne '_') ?
982                (exists($self->{FILE}) ? $self->{FILE} : $self->{AFP}) : '' ;
983
984            &Debug("Starting job " . $self->is_job .
985                ($restarted?"... ($restarted)":""));
986
987            # Initializes our birth and keep us in cache
988            $self->_set_defaults ;
989            $self->_born ;
990
991            # Check to remove old job status from previous start
992            if ($restarted) {
993                $self->{RESTARTED} = time ;
994                $self->{RESTART} = exists($self->{RESTART}) ?
995                    ++ $self->{RESTART} : 1 ;
996
997                # Set first step to number indicates a restart (with 0 -> >10)
998                $self->{STATE}->[0] = $self->{RESTART} > 9 ?
999                    0 : $self->{RESTART} ;
1000
1001                &Info("Seems to restart $restarted job".($self->{RESTART}>1?
1002                    " (restart #".$self->{RESTART}.")":""));
1003            }
1004
1005            # Set this status is new to being handled quickly
1006            $self->set_is_new ;
1007
1008        } elsif ( $Step == 1 and $State eq 'V' ) {
1009            # Specific to validation: force as new
1010            $self->set_is_new ;
1011            $State = 'o' ;
1012        }
1013
1014        # Don't update STEP if still at higher step (protection)
1015        $self->{STEP} = $Step unless ( $self->{STEP} > $Step );
1016
1017        # Update STATE checking if we can really update it
1018        $self->{STATE}->[$Step] = $State
1019            unless ( $self->{STATE}->[$Step] eq 'o'
1020            or ( $self->{STATE}->[$Step] eq 'A' and $State ne 'o' ) );
1021
1022        # Now set cache is dirty
1023        $self->set_dirty_bit ;
1024    }
1025
1026    # Still return the current step
1027    return $self->{STATE}->[$Step] eq '0' ? '_' : $self->{STATE}->[$Step] ;
1028}
1029
1030sub set_is_new {
1031    my $self = shift ;
1032
1033    # 1. Set the bit 3 to guaranty the status is not removed from tied hash
1034    # Before it is in DB
1035    $self->set_dirty_bit(3);
1036
1037    # 2. Use shared list of new status to inform a2p-status to handle this one
1038    #    This is done in SpoolManager thread
1039    my $shared_list = &SharedList( $self->get_lockid );
1040    push @{$shared_list}, $self->get_sid_key ;
1041    &FreeSharedList( $self->get_lockid );
1042
1043    # 3. Remove any stored Error or ABTERM
1044    delete $self->{ABTERM} ;
1045    delete $self->{ERRORS} ;
1046
1047    # 4. Remove any child error
1048    if (exists($self->{JOBS}) and ref($self->{JOBS}) =~ /^ARRAY/i) {
1049        map {
1050            delete $self->{$_} ;
1051
1052        } grep {
1053            defined($_) and $_ and exists($self->{$_})
1054        } @{$self->{JOBS}} ;
1055
1056        delete $self->{JOBS} ;
1057    }
1058}
1059
1060sub save_tied {
1061    my $self = shift ;
1062
1063    # Update our revision before trying to save it in tied hash
1064    # But disable setting revision if requested for the case of just renaming
1065    # status in tied hash
1066    $self->set_revision() unless ( @_ and $_[0] =~ /^not a revision$/ );
1067
1068    # Check we are really tied
1069    return 0 unless ( _is_tie_writer );
1070
1071    &TIMESTAT('SAVE-TIED');
1072
1073    # Reference is a number when we are knowing the SID index in DB
1074    # otherwise it's the first inserted key in tied hash given by get_sid_key
1075    my $sid_ref = $self->get_sid_index || $self->get_sid_key ;
1076
1077    # Update any important reference
1078    my $sidkey_re = qr/^$sid_ref$/ ;
1079    foreach my $ref (qw{ JOB AFP JOBID LINKEDAFP }) {
1080        # Skip undefined and not set
1081        next unless (defined($self->{$ref}) and $self->{$ref});
1082        # Skip the SID key itself
1083        next if ( $self->{$ref} =~ $sidkey_re );
1084        # Skip if ref is still set
1085        next if ( defined($TIEHASH->{$self->{$ref}}) and
1086            $TIEHASH->{$self->{$ref}} =~ $sidkey_re );
1087        # Then set reference
1088        $TIEHASH->{$self->{$ref}} = $sid_ref ;
1089        #&StatusDebug($self->{$ref}." replaced");
1090        # Delete old versioning
1091        if (exists($TIEHASH->{$self->{$ref}._rev_ext})) {
1092            delete $TIEHASH->{$self->{$ref}._rev_ext};
1093            #&StatusDebug($self->{$ref}._rev_ext. " deleted");
1094        }
1095    }
1096
1097    # Reset cached status
1098    my $cached = $self->is_cached ;
1099    $self->is_cached(0);
1100    $self->unset_dirty_bit ;
1101
1102    # Update tied timer before freeze
1103    $self->is_tied_timer(&ms());
1104
1105    # Prepare tied value as Storable freeze with DB information
1106    my $freeze = &freeze($self) ;
1107
1108    # Keep statistics on stored length in tied hash
1109    &MAXSTAT('MAX-TIED-STATUS-LENGTH',length($freeze));
1110    &UPSTAT('SAVE-TIED');
1111
1112    # Save tied value as Storable freeze
1113    my $sidkey = $self->get_sid_key ;
1114    $TIEHASH->{$sidkey} = $freeze ;
1115    $self->save_tied_versioning($sidkey) ;
1116    &Warn("$sidkey SID versioning still lost")
1117        unless (defined($TIEHASH->{$sidkey._rev_ext}));
1118    #&StatusDebug($self->is_job." saved in tied for key $sidkey rev. " .
1119    #    $self->get_revision);
1120
1121    # Reset cached status and others
1122    $self->is_cached($cached);
1123
1124    &TIMESTAT('SAVE-TIED');
1125    return $self ;
1126}
1127
1128sub DESTROY {
1129    # Remove debug message for minimal object
1130    $MUSTQUIT and &StatusDebug("Freeing " . __PACKAGE__ .
1131        " object memory " . $_[0]->get_sid_index . ":" . $_[0]->is_job .
1132        ( defined($_[0]->{INITIAL}) ? " (" . $_[0]->{INITIAL} . ")" : "" ));
1133}
1134
1135&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
1136
11371;
Note: See TracBrowser for help on using the repository browser.