source: A2P/a2p/A2P/AfpJob.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 33.0 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: AfpJob.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22# Class to manage an AFP Job
23#
24
25package A2P::AfpJob ;
26
27use strict;
28use Time::HiRes qw(gettimeofday tv_interval) ;
29use Data::Dumper ;
30use A2P::Globals ;
31use A2P::Syslog ;
32use A2P::Tools qw( ShortID );
33use A2P::JobStatus qw( a2pjobstate get_status ) ;
34use A2P::Status ;
35use A2P::DB qw( job_queue_update get_next_afpjob is_job_status_updated
36                a2p_db_connect a2p_db_connected get_serviceid_info
37                a2p_db_avoid_disconnect_in_forked );
38use A2P::Com qw( GetCom comJOB );
39use A2P::Client ;
40
41BEGIN {
42    our $VERSION = sprintf "%s", q$Rev: 1188 $ =~ /(\d[0-9.]+)\s+/ ;
43}
44our $VERSION ;
45
46my $Infos ;
47my $event ; # Private used to manage event
48our ( $notdone , $busy ) ;
49
50# Don't indent for freeze API
51$Data::Dumper::Indent = 0 ;
52$Data::Dumper::Purity = 1 ;
53$Data::Dumper::Terse  = 1 ;
54
55# Private constants
56sub _chained_request_qr { qr/^[a-z:]+$/ }
57
58sub new {
59    my $class = shift ;
60    my $mode  = shift || 0 ;
61    my $self  = { MODE => $mode, event => 0 } ;
62    bless $self , $class ;
63    #&Debug("new ".__PACKAGE__." object v$VERSION mode $mode created");
64    return $self ;
65}
66
67# Additionnal authorized actions by mode
68my $actions_by_mode = [
69    # User mode
70    [
71        'cancel',
72        #'pause',
73        #'resume',
74        #'ticket',
75        'update' ],
76    # Advanced mode
77    [ ]
78];
79
80sub actions {
81    # Return a list of possible actions by mode for this job
82    # The list is just an API binding list that can used, the caller should
83    # known how to use them with A2P::AfpJob objects
84    # start, validate & delete actions are implicit
85    my $self = shift ;
86    return @{$actions_by_mode->[$self->mode]} ;
87}
88
89sub afpspool {
90    # Set/return the AFPSPOOL for this job
91    my $self = shift ;
92    $self->{AFPSPOOL} = shift if (@_ and $_[0]);
93    return exists($self->{AFPSPOOL}) ? $self->{AFPSPOOL} : $AFPSPOOL ;
94}
95
96sub bad_request {
97    # Set/return the BAD REQUEST status
98    my $self = shift ;
99    $self->{BAD_REQUEST} = shift if @_ ;
100    return defined($self->{BAD_REQUEST}) ? $self->{BAD_REQUEST} : 0 ;
101}
102
103sub cancel {
104    # Cancel this job
105    my $self = shift ;
106
107    # Check if it's an event request
108    return $event > 0 ? 1 : 0
109        if ($event = $self->is_event_request($_[0],'cancel'));
110
111    # Initialization
112    my $name = $self->name ;
113
114    # Check we are knowing the service
115    my $service = $self->service ;
116    unless ($service) {
117        $self->error("Service for this job is not known");
118        return 0 ;
119    }
120
121    &Info("Cancelling A2P::AfpJob $name on service '$service'");
122    $self->info("Cancelling");
123
124    # Fork before
125    my $forked = fork ;
126    if (defined($forked)) {
127        if ($forked) {
128            &Info("Cancelling $name job...");
129            &Debug("Forked process to cancel $name" . ( $self->service ?
130                " on " . $self->service : "") )
131                if ($ADVANCED_DEBUGGING);
132            return 1 ;
133
134        } else {
135            &Debug("Cancelling $name job ");
136
137            # Set environment as in service
138            $self->fake_service ;
139
140            # Don't close DB connection when my job is done
141            &a2p_db_avoid_disconnect_in_forked();
142
143            # Get status object
144            my $status = $self->get_status_object ;
145            unless ($status) {
146                &Error("Can't get Status object of $name");
147                exit ;
148            }
149
150            # Initialize jobstatus
151            $self->report("Cancelling");
152            &a2pjobstate( $status->{AFP} , 12 , 'o',
153                {
154                    JOBID  => $name,
155                    STATUS => 'CANCELLING',
156                    INFOS  => 'Cancelling'
157                } )
158                or &Info("Can't set '$name' job status");
159
160            ( $notdone , $busy ) = ( 1, 0 ) ;
161            my $resync_timer = [ &gettimeofday() ] ;
162
163            # Try to cancel the job
164            while ( $notdone > 0 and $notdone <= 10 ) {
165                my @ret = &TellListener( $service,
166                    JobManager => &GetCom( comJOB , $name => 'cancel' ),
167                    sub {
168                        while (@_) {
169                            my $info = shift ;
170                            &Debug("Cancel $name: $info")
171                                if ($ADVANCED_DEBUGGING);
172                            $self->report($info);
173                            $notdone = 0 if ( $info =~ /^CANCEL.*OK$/i );
174                            $notdone = -1
175                                if ( $info =~ /ANSWERED REQUEST TIME OUT/
176                                or $info =~ /CANCEL.*KO/ );
177                            $busy++
178                                if ($info =~ /^ERROR.*MAX JOBS COUNT REACHED$/);
179                        }
180
181                        # Resync object in DB after a while
182                        if ( &tv_interval($resync_timer) > 5 ) {
183                            $resync_timer = [ &gettimeofday() ] ;
184                            $self->sync_with_jobs_queue('on_cancel') ;
185                        }
186                    });
187
188                &Debug("Cancel $name on $service returned '@ret'")
189                    if ( $ADVANCED_DEBUGGING and @ret );
190
191                if ( $notdone > 0 ) {
192                    last unless ( $notdone < 10 or $busy );
193                    &Warn("Job $name cancel not done, retrying soon");
194                    sleep 2 ;
195                    &Info("Cancelling again $name job");
196                    $notdone ++ unless $busy ;
197                    $busy = 0 ;
198                }
199            }
200
201            if ($notdone) {
202                $self->error("Job $name seems not cancelled");
203                &Warn("Job $name seems not cancelled");
204
205            } else {
206                &Info("Job $name cancelled");
207                &a2pjobstate( $status->{AFP} , 12 , 'o',
208                    {
209                        JOBID  => $name,
210                        INFOS  => 'Cancelled'
211                    } )
212                    or &Info("Can't set info on '$name' job status");
213            }
214
215            # Reset event and resync object in DB before quitting
216            $self->unset_event ;
217
218            exit $notdone ;
219        }
220
221    } else {
222        &Warn("Can't fork to cancel $name job: $!");
223    }
224
225    return 1 ;
226}
227
228sub date {
229    # Set/return a date information for this jobs as seconds since EPOCH
230    my $self = shift ;
231    $self->{DATE} = shift if (@_ and $_[0]);
232    return exists($self->{DATE}) ? $self->{DATE} : time ;
233}
234
235sub delete {
236    # Just delete the job from the queue
237    my $self = shift ;
238
239    my ( $name, $jid ) = ( $self->name, $self->{jid} );
240    &Debug("Deletion of $name (row $jid)");
241
242    # We set the revision to a negative value and update the object
243    return &job_queue_update( $jid, 0, 0, $name, '', -1 ) ;
244}
245
246sub details {
247    # TODO implement
248    # Return any useful informations for advanced mode
249    my $self = shift ;
250    return $self->infos ;
251}
252
253sub enqueue {
254    # Enqueue the A2P::AfpJob in DB table
255    my $self = shift ;
256    my $errs = $self->error ; # Get the current errors count
257
258    # 1. Set the mode if specified, or set it to user mode
259    $self->mode(@_ ? $_[0] : 0 );
260
261    # 2. Force to synchronize us in jobs_queue
262    # TODO Better check if the status is still in diagnostics table
263    $self->sync_with_jobs_queue(1);
264
265    # Return true if no new error has been set
266    return $self->error == $errs ? 1 : 0 ;
267}
268
269sub enqueue_status {
270    # Enqueue the A2P::Status for this job in DB table
271    my $self = shift ;
272    my $status = shift ;
273
274    # A chained request must be composed of lower case and char ':' as separator
275    my $chained = shift || '' ;
276
277    $self->{STATUS_OBJECT} = $status ;
278    $self->name($status->is_job);
279    $self->jobname($status->{AFP});
280    &Info("Enqueuing ".$status->is_job." in jobs_queue table...");
281
282    # Enqueue in diagnostics table
283    return &Error("AfpJob ".$self->name." not enqueued")
284        unless ($self->enqueue);
285
286    # Just ask update if not doing chained request
287    return $self->update
288        unless ( $chained =~ _chained_request_qr );
289
290    # Just ask for the event chained request
291    &Info("Requesting '$chained' events on ".$self->name);
292    return $event > 0 ? 1 : 0
293        if ( $event = $self->is_event_request( $self->lockid, $chained ) );
294}
295
296sub error {
297    # Add an error message, return an array of errors line
298    # The error status will be true if any error message is set
299    my $self = shift ;
300    push @{$self->{ERRORS}}, @_ if (@_);
301    return exists($self->{ERRORS}) ? @{$self->{ERRORS}} : () ;
302}
303
304sub fake_service {
305    # Update environment to fake a service
306    my $self = shift ;
307    $LOCKID = $self->lockid ;
308    $SCAN_SPOOL = 1 ;
309    $KEEP_JOBSTATUS = 1 ;
310
311    # Also consolidate db connection
312    $RECONNECT_TIMER = 0 ;
313    &a2p_db_connect ;
314}
315
316sub file {
317    # Set/return path to AFP file
318    my $self = shift ;
319    my $file = $self->{FILE} || '' ;
320
321    # Get the file from status if still not available or set
322    unless ($file) {
323        my $status = $self->get_status_object ;
324        $file = $status->{FILE} if (exists($status->{FILE}));
325    }
326
327    # We need to set the full path to file
328    my $afpspool = $self->afpspool ;
329    if ( $file and $file !~ m|^/| and $afpspool ) {
330        &Debug("Trying file '$file' in '$afpspool' as spool")
331            if ($ADVANCED_DEBUGGING);
332        $file = $afpspool . '/' . $file ;
333
334    } else {
335        my $jobname = $self->jobname ;
336        my $folder = $afpspool . '/' . $self->lockid ;
337        &Debug("Searching jobname '$jobname' in '$folder' folder")
338            if ($ADVANCED_DEBUGGING);
339        my $glob = $folder . '/*' . $jobname ;
340        my @list = glob($glob);
341        if (@list) {
342            &Debug("Trying ".(@list>1?"first of ":"")."found: @list")
343                if ($ADVANCED_DEBUGGING);
344            $file = shift @list ;
345
346        } else {
347            $file = '' ;
348            $self->error("No jobname '$jobname' found in '$folder' folder");
349        }
350    }
351
352    # Set error if job is not found
353    if ( $file and ! -d $file and -e $file ) {
354        &Debug("Found file is '$file' for ".$self->name)
355            if ($ADVANCED_DEBUGGING);
356
357    } else {
358        delete $self->{SIZE} ;
359        $self->error("File '$file' not found");
360        $file = '' ;
361    }
362
363    # Set also file size
364    $self->{SIZE} = -s $file if $file ;
365
366    return $self->{FILE} = $file ;
367}
368
369sub freeze {
370    my $self = shift ;
371    return Dumper($self) ;
372}
373
374sub get_next_from_db {
375    my $self = shift ;
376    my $service = shift ; # If provided, we are looking for event in jobs_queue
377
378    # Get the next row as a hash with column names as keys
379    my $row = &get_next_afpjob( $self->mode, $service )
380        or return 0 ;
381
382    return 0 unless (defined($row) and ref($row) =~ /^HASH/);
383
384    my $newself = &thaw($row->{THAWN}) ;
385
386    return 0 unless ( ref($newself) =~ /^A2P::AfpJob/ );
387
388    # Update info which can be not synchronized at some time
389    $newself->{jid} = $row->{JID} ;
390    $newself->{DBTIME} = $row->{DBTIME} ;
391    $newself->mode($self->mode);
392
393    $newself->sync_with_jobs_status ;
394
395    return $newself ;
396}
397
398sub _get_serviceid_of {
399    unless (&a2p_db_connected() and $Infos) {
400        &Debug("Initializing A2P::Infos object");
401        $Infos = &get_serviceid_info('check all');
402        return 0 unless $Infos ;
403    }
404    return $Infos->get_serviceid_of(@_);
405}
406
407sub get_size {
408    # return the AFP file size
409    my $self = shift ;
410    my $size = 0 ;
411    if (exists($self->{SIZE})) {
412        $size = $self->{SIZE} ;
413
414    } else {
415        my $file = $self->file ;
416        if ($file) {
417            # Stange, FILE is set but not SIZE... try again to get size
418            $! = 0 ;
419            $self->{SIZE} = $size = -s $file ;
420            $self->error("Can't get size of '$file': $!")
421                unless (defined($size));
422
423        } else {
424            $self->error("Can't know size of not found file");
425        }
426    }
427    return defined($size) ? $size : 0 ;
428}
429
430sub get_status_object {
431    # Return a status object
432    my $self = shift ;
433    return $self->{STATUS_OBJECT} if (exists($self->{STATUS_OBJECT}));
434    return $self->{STATUS_OBJECT} = new A2P::Status( {}, {} ) ;
435}
436
437sub handle_event {
438    # Check the current event and do the requested job
439    my $self = shift ;
440    return 0 unless ($self->{event} and $self->{EVENT});
441
442    # A chained request can be composed of lower case and char ':' as separator
443    my @events = grep { /^[a-z]+$/ } split( /:/, $self->{EVENT} );
444    # Remark: A chained events string should represent different event so the
445    # later test must match
446
447    # Check EVENT is really authorized
448    my @possible_actions = ( 'start' , 'validate', 'delete', $self->actions );
449
450    unless ( @events == grep { $self->{EVENT} =~ /$_/ } @possible_actions ) {
451        $self->error("Unauthorized to do '$self->{EVENT}'");
452        return 0 ;
453    }
454
455    # Really do the action
456    my  ( $ret , $deleted ) = ( 0 , 0 );
457    foreach my $todo (@events) {
458        &Debug("Doing evaluation of self->$todo on ".$self->name)
459            if ($ADVANCED_DEBUGGING);
460        unless ( $ret = eval '$self->' . $todo ) {
461            &Warn("Bad $todo event handling on ".$self->name);
462            last ;
463        }
464        $deleted ++ if ( $todo eq 'delete' );
465    }
466
467    if ($ret) {
468        &Debug("Done '".$self->{EVENT}."' on ".$self->name)
469            if ($ADVANCED_DEBUGGING);
470    } else {
471        &Error("Can't do '".$self->{EVENT}."' on ".$self->name);
472        map { &Error($_) } $self->error ;
473    }
474
475    # Anyway release events
476    $self->unset_event ;
477
478    return $ret ? ( $deleted ? -1 : 1 ) : 0 ;
479}
480
481sub info {
482    # Set info on the status object, can only be used in a2p-status service
483    my $self = shift ;
484    my $info = shift
485        or return 0 ;
486
487    my $status = $self->get_status_object ;
488    $status->{INFOS} = $info ;
489    &Debug("Information for ".$self->name." set to '$info'")
490        if ($ADVANCED_DEBUGGING);
491
492    # Set status is dirty and guaranty it will be push to status DB table
493    $status->set_dirty_bit ;
494    $status->set_dirty_bit(3) ;
495}
496
497my %afpjob_infos = (
498    DBTIME => 'Time update in DB', REV => 'diag_rev', NAME => 'jobname',
499    JOBNAME => 'afpname', JOBID => 'jobname', STATUS => 'status',
500    ERRORS => 'errors', FILE => 'file', SIZE => 'file_size', WARNINGS => 'warn',
501    DESTIDS => 'destid', jid => 'db_jid', EVENT => 'event', REPORTS => 'report',
502    SERVICE => 'a2p_service', AFPSPOOL => 'a2p_afpspool',
503    ZIPFILE => 'a2p_zipfile'
504);
505
506my %status_infos = (
507    STATE => 'states', REV => 'rev', LOCKID => 'lockid', STEP => 'step',
508    SID => 'status_row', JOBID => 'jobname', STATUS => 'status',
509    ABTERM => 'abterm', NBJOBS => 'nbjobs', AFP => 'afpname',
510    INFOS => 'infos', DAY => 'day', JOBS => 'jobs', RESTART => 'restart'
511);
512
513sub infos {
514    # Return any useful informations for at least user mode
515    my $self = shift ;
516    my $status = $self->get_status_object ;
517
518    # Update zipfile member
519    $self->search_zipfile ;
520
521    # Get infos from A2P::AfpJob
522    my %infos = map { $afpjob_infos{$_} =>
523        ref($self->{$_}) =~ /^array/i ?
524            join("\n", grep { length($_) } @{$self->{$_}}) : $self->{$_}
525        } keys(%afpjob_infos) ;
526
527    map { $infos{$status_infos{$_}} = ref($status->{$_}) =~ /^array/i ?
528            join("\n", grep { length($_) } @{$status->{$_}}) : $status->{$_}
529        } keys(%status_infos)
530            if ($self != $status);
531
532    # Adapt special case
533    if (exists($infos{states})) {
534        $infos{states} = join("",split(/\n/,$infos{states})) ;
535    }
536
537    # Delete any empty value
538    map { delete $infos{$_} } grep { ! length($infos{$_}) } keys(%infos);
539
540    return \%infos ;
541}
542
543sub is_abterm {
544    # Return true if the status is abterm
545    my $self = shift ;
546    return 1 if ($self->bad_request);
547    my $status = $self->get_status_object || 0 ;
548    return $status ? $status->is_abterm : 0 ;
549}
550
551sub is_done {
552    # Return true if the status is done
553    my $self = shift ;
554    my $status = $self->get_status_object || 0 ;
555    return $status ? $status->is_done : 0 ;
556}
557
558sub is_event_request {
559    # Return boolean if it's an event request, the boolean is negative if it's
560    # a faulty event request
561    my $self = shift ;
562    return 0 if ($self->{event});
563
564    # Check if lockid is provided
565    my $lockid = shift || $self->lockid ;
566
567    # A chained request must be composed of lower case and char ':' as separator
568    my $request = shift || 'update' ;
569
570    &Debug("Initiating $request on ".$self->name) if ($ADVANCED_DEBUGGING);
571    return $self->set_event_on( $request, $lockid ) ? 1 : -1 ;
572}
573
574sub jobname {
575    # Set/return the jobname
576    my $self = shift ;
577    $self->{JOBNAME} = shift if @_ ;
578    return exists($self->{JOBNAME}) ? $self->{JOBNAME} : '' ;
579}
580
581sub lockid {
582    # Set/return the lockid
583    my $self = shift ;
584    my $status = $self->get_status_object || $self ;
585    $status->{LOCKID} = shift if @_ ;
586    return exists($status->{LOCKID}) ? $status->{LOCKID} : '' ;
587}
588
589sub md5 {
590    # Set/return the md5 to AFP file
591    my $self = shift ;
592    unless (exists($self->{MD5})) {
593        my $file = $self->file ;
594        my $md5 = 0 ;
595        if ($file) {
596            my $md5cmd = qx/md5sum "$file"/ ;
597            my ( $result , $control ) = $md5cmd =~ /^(\w+)\s+(.+)$/ ;
598            if ( $control eq $file ) {
599                $md5 = $result ;
600            } else {
601                $self->warning("Can't retrieve md5 of '$file' file");
602            }
603        } else {
604            $self->warning("Can't compute md5 of not found file");
605        }
606        $self->{MD5} = $md5 ;
607    }
608    return exists($self->{MD5}) ? $self->{MD5} : 0 ;
609}
610
611sub mode {
612    # Return/set the mode
613    my $self = shift ;
614
615    if (@_) {
616        my $mode = shift || 0 ;
617        # Mode can only be 0: user mode or 1: advanced mode
618        $self->{MODE} = ($mode =~ /^\d+$/ and $mode)? 1 : 0 ;
619    }
620
621    return exists($self->{MODE}) ? $self->{MODE} : 0 ;
622}
623
624sub name {
625    # Return/set a name to identify this AFP job
626    my $self = shift ;
627
628    if (@_) {
629        my $name = shift ;
630        # Set a default name if called with a false value
631        $name = sprintf( "A2P-AFPJOB-%s", &ShortID )
632            unless (defined($name) and $name);
633        $self->{NAME} = $name ;
634    }
635
636    return $self->{NAME} = (defined($self->{NAME}) and $self->{NAME}) ?
637        $self->{NAME} : ( $self->{JOBID} ? $self->{JOBID} : (
638        $self->{JOBNAME} ? $self->{JOBNAME} : "" ));
639}
640
641sub nbjobs {
642    # Set/return the number of sub-jobs from status object
643    my $self = shift ;
644    my $status = $self->get_status_object || $self ;
645    $status->{NBJOBS} = shift if @_ ;
646    return exists($status->{NBJOBS}) ? $status->{NBJOBS} : 0 ;
647}
648
649sub open_ticket {
650    # TODO implement
651    # Open a ticket for this job
652    my $self = shift ;
653    &Debug("Opening ticket still not supported for ".$self->name);
654    return 0 ;
655}
656
657sub otherspools {
658    # Set/return the DONESPOOL & ERRORSPOOL for this job
659    my $self = shift ;
660    $self->{DONESPOOL}  = shift if (@_ and $_[0]);
661    $self->{ERRORSPOOL} = shift if (@_ and $_[0]);
662    return map { exists($self->{$_}) ? $self->{$_} : '' }
663        qw( DONESPOOL ERRORSPOOL ) ;
664}
665
666sub pause {
667    # TODO implement
668    # Pause this job
669    my $self = shift ;
670    &Debug("Pause still not supported for ".$self->name);
671    return 0 ;
672}
673
674sub report {
675    # Add reporting message, return an array of reporting lines
676    my $self = shift ;
677    push @{$self->{REPORTS}}, @_ if (@_);
678    return exists($self->{REPORTS}) ? @{$self->{REPORTS}} : () ;
679}
680
681sub resume {
682    # TODO implement
683    # Resume this paused job
684    my $self = shift ;
685    &Debug("Resume still not supported for ".$self->name);
686    return 0 ;
687}
688
689sub search_zipfile {
690    my $self = shift ;
691
692    return &Debug("DONESPOOL not set")  unless (defined($self->{DONESPOOL}));
693    return &Debug("ERRORSPOOL not set") unless (defined($self->{ERRORSPOOL}));
694    my $base = $self->lockid . '-' . $self->name ;
695
696    my @files = glob( $self->{DONESPOOL} . '/' . $base . '*.zip' );
697    push @files, glob( $self->{ERRORSPOOL} . '/' . $base . '*.zip' );
698
699    return $self->{ZIPFILE} = \@files ;
700}
701
702sub service {
703    # Set/return the service for the status object
704    my $self = shift ;
705    $self->{SERVICE} = shift if @_ ;
706    return defined($self->{SERVICE}) ? $self->{SERVICE} : '' ;
707}
708
709sub set_event_on {
710    # Set the event flag on this job in the jobs_queue table
711    my $self = shift ;
712      $event = shift ;
713
714    # Event is related to a lockid
715    my $lockid = shift || $self->lockid ;
716    unless ($lockid) {
717        $self->error("Can't set serviceid when lockid is unknown");
718        return 0 ;
719    }
720
721    # Get the serviceid for that LOCKID
722    my $serviceid = &_get_serviceid_of($lockid);
723    unless ($serviceid) {
724        $self->error("Can't found serviceid of $lockid lockid");
725        return 0 ;
726    }
727    &Debug("Got serviceid $serviceid for lockid $lockid");
728
729    my ( $name, $jid ) = ( $self->name, $self->{jid} );
730    &Debug("Setting event $serviceid on $name (row $jid)");
731
732    # Set event flag with serviceid and event callback
733    $self->{event} = $serviceid ;
734    $self->{EVENT} = $event ;
735
736    # Synchronize with table
737    return $self->sync_with_jobs_queue('event') ;
738}
739
740sub set_status_object {
741    # Set the status object
742    my $self = shift ;
743    my $status = shift ;
744    return 0 unless (ref($status) =~ /^A2P::Status/);
745    return $self->{STATUS_OBJECT} = $status ;
746}
747
748sub start {
749    # Start this job with 2 cases:
750    # 1. Called from Webmin -> we want it to be called by the right a2p-status
751    # 2. Called by a2p-status after it detects it has to start a job
752    my $self = shift ;
753
754    my $name = $self->name ;
755    unless ($self->{event}) {
756        # Check if lockid is provided
757        my $lockid = shift || $self->lockid ;
758
759        &Debug("Initiating start on $name") if ($ADVANCED_DEBUGGING);
760        return $self->set_event_on( 'start', $lockid );
761    }
762
763    # Really start now
764    my $file = $self->file ;
765
766    # Check file definition
767    unless ( $file and $file !~ m|/$| ) {
768        $self->error("File for this job is not known");
769        return 0 ;
770    }
771
772    # Check file existence
773    # TODO Add check file is good AFP file
774    unless ( ! -d $file and -e $file ) {
775        $self->error("File '$file' not available");
776        return 0 ;
777    }
778
779    # Check we are knowing the service
780    my $service = $self->service ;
781    unless ($service) {
782        $self->error("Service for this job is not known");
783        return 0 ;
784    }
785
786    &Info("Restarting A2P::AfpJob $name with file '$file'");
787    $self->info("Restarting");
788
789    # To start we would to call directly the service, but fork before
790    my $forked = fork ;
791    if (defined($forked)) {
792        if ($forked) {
793            &Debug("Forked process to restart $name on $service")
794                if ($ADVANCED_DEBUGGING);
795            return 1 ;
796
797        } else {
798            # Set environment as in service
799            $self->fake_service ;
800
801            # Don't close DB connection when my job is done
802            &a2p_db_avoid_disconnect_in_forked();
803
804            # Get status object
805            my $status = $self->get_status_object ;
806            unless ($status) {
807                &Error("Can't get Status object of $name");
808                exit ;
809            }
810
811            my $report = $self->{EVENT} =~ /delete/ ? 0 : 1 ;
812            $self->{event} = 0 if $report ;
813
814            # Initialize jobstatus with AfpName to keep restarts in mind
815            $self->report("Restarting") if ($report);
816            &a2pjobstate( $status->{AFP} , 1 , 'o',
817                {
818                    AFP    => $status->{AFP},
819                    JOBID  => $name,
820                    STATUS => 'STARTING',
821                    FILE   => $file,
822                    INFOS  => 'Restarting'
823                } )
824                or &Info("Can't initialize '$name' job status");
825
826            ( $notdone , $busy ) = ( 1, 0 ) ;
827            my $resync_timer = [ &gettimeofday() ] ;
828            my $ABTERM = "" ;
829
830            # Try to restart the job
831            while ( $notdone > 0 and $notdone <= 10 ) {
832                my @ret = &TellListener( $service,
833                    JobManager => &GetCom( comJOB , $name => $file ),
834                    sub {
835                        while (@_) {
836                            my $info = shift ;
837                            &Debug("Running $name: $info")
838                                if ($ADVANCED_DEBUGGING);
839                            $self->report($info) if ($report);
840                            $notdone = 0 if ( $info =~ /^JOBSTATUS.*OK$/i );
841                            $notdone = -1
842                                if ( $info =~ /ANSWERED REQUEST TIME OUT/
843                                or $info =~ /JOBSTATUS.*KO/ );
844                            $busy++
845                                if ($info =~ /^ERROR.*MAX JOBS COUNT REACHED$/);
846                            $ABTERM = $1
847                                if ( $info =~ /ABTERM:\s*(.*)$/i );
848                        }
849
850                        # Resync object in DB after a while
851                        if ( &tv_interval($resync_timer) > 5 ) {
852                            $resync_timer = [ &gettimeofday() ] ;
853                            $self->sync_with_jobs_queue('on_restart') ;
854                        }
855                    });
856
857                &Debug("Start $name on $service returned '@ret'")
858                    if ( $ADVANCED_DEBUGGING and @ret );
859
860                if ( $notdone > 0 ) {
861                    last unless ( $notdone < 10 or $busy );
862                    &Warn("Job $name not done, retrying soon");
863                    sleep 2 ;
864                    &Info("Retrying restart of $name job");
865                    $notdone ++ unless $busy ;
866                    $busy = 0 ;
867                }
868            }
869
870            if ($notdone) {
871                $self->error("Job $name not restarted");
872                &Error("Job $name not restarted");
873                &a2pjobstate( $status->{AFP} , 12 , 'o',
874                    {
875                        AFP    => $status->{AFP},
876                        JOBID  => $name,
877                        STATUS => 'KO',
878                        INFOS  => $ABTERM || 'Restart failed'
879                    } )
880                    or &Info("Can't set failed info on '$name' job status");
881
882            } else {
883                &a2pjobstate( $status->{AFP} , 12 , 'o',
884                    {
885                        AFP    => $status->{AFP},
886                        JOBID  => $name,
887                        INFOS  => 'Restart done'
888                    } )
889                    or &Info("Can't set info on '$name' job status");
890            }
891
892            # Reset event and resync object in DB before quitting
893            $self->unset_event if ($report);
894
895            exit $notdone ;
896        }
897    }
898
899    $self->error("Can't fork to ask $service service to restart $name");
900    return &Error("Can't fork to ask $service service to restart $name");
901}
902
903sub status {
904    # Set/return the status from status object
905    my $self = shift ;
906    return 'BAD REQUEST' if ($self->bad_request);
907    my $status = $self->get_status_object || $self ;
908    $status->{STATUS} = shift if @_ ;
909    return defined($status->{STATUS}) ? $status->{STATUS} : '' ;
910}
911
912sub step {
913    # Set/return the step from status object
914    my $self = shift ;
915    my $status = $self->get_status_object || $self ;
916    $status->{STEP} = shift if @_ ;
917    return defined($status->{STEP}) ? $status->{STEP} : 0 ;
918}
919
920sub _row_index {
921    my $self = shift ;
922    return ( $self->{jid}, $self->{MODE}, $self->{event}, $self->name,
923        $self->jobname, $self->{REV} ) ;
924}
925
926sub sync_with_jobs_queue {
927    # sync ourself to jobs_queue table in DB
928    my $self = shift ;
929
930    if ( ! ( defined($self->{jid}) and $self->{jid} )
931    or ( defined($_[0]) and $_[0] ) ) {
932        # Called with arg will push this object as new version
933        $self->{REV} = exists($self->{REV}) ? $self->{REV}+1 : 0 ;
934
935        # Call DB API with expected row
936        my $jid = &job_queue_update( $self->_row_index, $self->freeze );
937
938        # Set jid when it was unknown and it is returned, and update us in DB...
939        if (!$self->{jid} and $jid) {
940            $self->{REV} ++ ;
941            $self->{jid} = $jid ;
942            $jid = &job_queue_update( $self->_row_index, $self->freeze );
943        }
944
945        $self->error("Can't update $self->name in DB with JID $jid")
946            unless ($jid and $self->{jid} == $jid);
947
948    } else {
949        # Get the one from DB and return it if more recent
950        # The API should return a storable newer object or undef
951        my $thawn = &job_queue_update( $self->_row_index );
952
953        my $newself = &thaw($thawn) ;
954        if (ref($newself) =~ /^A2P::AfpJob/) {
955            # Only replace us to newer version if really more recent
956            $self = $newself if ($newself->{REV} >= $self->{REV});
957
958        } else {
959            $self->error("Bad object retrieved from DB (".ref($thawn).
960                ";".ref($newself).")");
961        }
962    }
963
964    # Return self as synchronized object
965    return $self ;
966}
967
968sub sync_with_jobs_status {
969    # sync ourself to jobs_status table in DB
970    my $self = shift ;
971    my $status = $self->get_status_object
972        or return &Debug("No status object for ".$self->name) ;
973
974    return &Debug("No status SID for ".$self->name)
975        unless ($status->get_sid_index);
976
977    # Reset checked in db timer
978    $status->checked_in_db_timer(0);
979    $status->check_update_in_db ;
980
981    # Return self as synchronized object
982    return $self ;
983}
984
985sub thaw {
986    my $freezed = shift ;
987    my $object = eval $freezed ;
988    return $object ;
989}
990
991sub unset_event {
992    # Set the event flag on this job in the jobs_queue table
993    my $self = shift ;
994
995    my ( $name, $jid ) = ( $self->name, $self->{jid} );
996    &Debug("Resetting event on $name (row $jid)");
997
998    # Set event flag with serviceid
999    $self->{event} = 0 ;
1000    delete $self->{EVENT} ;
1001
1002    # Synchronize with table
1003    return $self->sync_with_jobs_queue('reset_event') ;
1004}
1005
1006sub update {
1007    # Really do nothing, but this is a convenient way to update as action
1008    my $self = shift ;
1009
1010    # Check if it's an event request
1011    return $event > 0 ? 1 : 0
1012        if ($event = $self->is_event_request($_[0],'update'));
1013
1014    &Debug("Updating informations for ".$self->name);
1015    return 1 ;
1016}
1017
1018sub validate {
1019    my $self = shift ;
1020
1021    # Check if it's an event request
1022    return $event > 0 ? 1 : 0
1023        if ($event = $self->is_event_request($_[0],'validate'));
1024
1025    my $name = $self->name ;
1026
1027    # Fork before doing validation
1028    my $forked = fork ;
1029    if (defined($forked)) {
1030        if ($forked) {
1031            &Info("Validating $name job...");
1032            &Debug("Forked process to validate $name" . ( $self->service ?
1033                " on " . $self->service : "") )
1034                if ($ADVANCED_DEBUGGING);
1035            return 1 ;
1036
1037        } else {
1038            # Set environment as in service
1039            $self->fake_service ;
1040
1041            # Don't close DB connection when my job is done
1042            &a2p_db_avoid_disconnect_in_forked();
1043
1044            my $status = $self->get_status_object ;
1045            unless ($status) {
1046                &Error("Can't get Status object of $name");
1047                exit ;
1048            }
1049
1050            # Initialise status as new status
1051            &a2pjobstate( $status->{AFP} , 1 , 'V',
1052                {
1053                    AFP    => $status->{AFP},
1054                    JOBID  => $name,
1055                    STATUS => 'STARTING',
1056                    INFOS  => 'Validating'
1057                } )
1058                or &Info("Can't initialize '$name' job status validation");
1059
1060            foreach my $step (2..12) {
1061                my $state = $status->step_status($step);
1062                next unless ($state eq 'A');
1063                &Debug("Validating $name step $step...");
1064                &a2pjobstate( $status->{AFP} , $step , 'o',
1065                    {
1066                        JOBID  => $name,
1067                        STATUS => 'DONE',
1068                        INFOS  => 'Validating'
1069                    } )
1070                    or &Warn("Can't validate '$name' job status at step $step");
1071            }
1072
1073            # Validate jobstatus
1074            if ( &a2pjobstate( $status->{AFP} , 12 , 'o', {
1075                JOBID  => $name,
1076                STATUS => 'DONE',
1077                INFOS  => 'Validated'
1078            } ) ) {
1079                &Info("Validated $name job");
1080
1081            } else {
1082                &Error("Can't validate '$name' job status");
1083            }
1084
1085            # Finally quit this forked process
1086            exit ;
1087        }
1088
1089    } else {
1090        &Warn("Can't fork to validate $name status: $!");
1091    }
1092
1093    return 1 ;
1094}
1095
1096sub warning {
1097    # Add warning message, return an array of warning lines
1098    my $self = shift ;
1099    push @{$self->{WARNINGS}}, @_ if (@_);
1100    return exists($self->{WARNINGS}) ? @{$self->{WARNINGS}} : () ;
1101}
1102
1103&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
1104
11051;
1106
Note: See TracBrowser for help on using the repository browser.