source: A2P/a2p/A2P/DB.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 66.5 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: DB.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22
23package A2P::DB ;
24
25use strict ;
26use Time::HiRes qw( gettimeofday tv_interval usleep ) ;
27use DBI ;
28use A2P::Globals ;
29use A2P::Syslog ;
30use A2P::Infos ;
31
32BEGIN {
33    use Exporter ();
34
35    our ( $VERSION, @ISA, @EXPORT, @EXPORT_OK );
36
37    $VERSION = sprintf "%s", q$Rev: 1426 $ =~ /(\d[0-9.]+)\s+/ ;
38
39    @ISA = qw(Exporter);
40
41    # Export common API
42    @EXPORT = qw(
43        &a2p_db_connect &a2p_db_disconnect &a2p_db_connected &check_commit
44        &a2p_db_avoid_disconnect_in_forked
45        );
46
47    # Export OK jobstatus API
48    push @EXPORT_OK, qw(
49        &CheckJobsStatusTables &get_serviceid_info &job_status_update
50        &is_job_status_updated &get_next_jobstatus &job_status_delete
51        &CheckJobsQueueTable &job_queue_update &get_next_afpjob
52        );
53
54    # Export OK statistics API
55    push @EXPORT_OK, qw(
56        &a2p_db_statistics_ready &CheckJobsTables &stats_set_service
57        &stats_jobmanager &stats_converter
58        );
59}
60our $VERSION ;
61
62my $DBH ;
63my %TABLES ;
64my $CACHE ;
65
66my $JOB ;    # Array ref set in &stats_converter or &stats_jobmanager
67my $MONTH ;  # String set in &CheckJobsTables
68
69my %CREATE_TABLE = (
70    afpjobs_YYYYMM => q{
71CREATE TABLE afpjobs_YYYYMM (
72  afpjid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
73  JobID TEXT NOT NULL,
74  serviceid INTEGER UNSIGNED NOT NULL,
75  Date VARCHAR(8) NOT NULL,
76  DayTime TIME NOT NULL,
77  State VARCHAR(2) NOT NULL,
78  Errors SMALLINT UNSIGNED NOT NULL,
79  NBJobs SMALLINT UNSIGNED NOT NULL,
80  Chrono FLOAT NOT NULL,
81  Counted TINYINT UNSIGNED NOT NULL DEFAULT 0,
82  PRIMARY KEY(afpjid),
83  INDEX jobids_index(JobID(16),serviceid)
84)
85TYPE=InnoDB;
86    },
87    jobs_YYYYMM => q{
88CREATE TABLE jobs_YYYYMM (
89  jid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
90  afpjid INTEGER UNSIGNED NOT NULL,
91  Number SMALLINT UNSIGNED NOT NULL,
92  destid INTEGER UNSIGNED NOT NULL,
93  docid INTEGER UNSIGNED NOT NULL,
94  NbPages SMALLINT UNSIGNED NOT NULL,
95  State VARCHAR(2) NULL,
96  Timing FLOAT NOT NULL,
97  Chrono FLOAT NOT NULL,
98  Kind VARCHAR(1) NULL,
99  Errors TINYINT UNSIGNED NOT NULL,
100  Counted TINYINT UNSIGNED NOT NULL DEFAULT 0,
101  Jobname TEXT NULL,
102  PRIMARY KEY(jid),
103  UNIQUE INDEX unique_jids_index(afpjid,Number)
104)
105TYPE=InnoDB;
106    },
107    jobs_status => q{
108CREATE TABLE jobs_status (
109  sid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
110  serviceid INTEGER UNSIGNED NULL,
111  JobID TEXT NULL,
112  Revision INTEGER UNSIGNED NULL,
113  Timer BIGINT UNSIGNED NULL,
114  Step TINYINT UNSIGNED NULL,
115  Steps VARCHAR(13) NULL,
116  Born BIGINT UNSIGNED NULL,
117  Date VARCHAR(8) NULL,
118  Jobname TEXT NULL,
119  CurrentStatus TEXT NULL,
120  NbJobs INTEGER UNSIGNED NULL,
121  DestIDs TEXT NULL,
122  Infos TEXT NULL,
123  dbtime TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
124  PRIMARY KEY(sid),
125  INDEX jobs_status_index(serviceid, JobID(16)),
126  INDEX jobs_status_jobname_index(Jobname(16))
127)
128TYPE=InnoDB;
129    },
130    jobs_queue => q{
131# jobs_queue
132    CREATE TABLE jobs_queue (
133        jid INTEGER
134            UNSIGNED NOT NULL AUTO_INCREMENT,
135        Mode TINYINT
136            # Mode = 0 -> User mode ; Mode > 0 -> Advanced mode
137            UNSIGNED NULL,
138        serviceid INTEGER
139            UNSIGNED NULL,
140        JobID TEXT
141            NULL,
142        Jobname TEXT
143            NULL,
144        Revision INTEGER
145            UNSIGNED NULL,
146        Object TEXT
147            NULL,
148        dbtime TIMESTAMP
149            NULL DEFAULT CURRENT_TIMESTAMP
150            ON UPDATE CURRENT_TIMESTAMP,
151        PRIMARY KEY(jid),
152        INDEX mode_index(Mode,serviceid)
153    )
154    TYPE=InnoDB;
155    },
156    Services => q{
157CREATE TABLE Services (
158  serviceid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
159  Name TEXT NOT NULL,
160  Server TEXT NOT NULL,
161  LockID TEXT NOT NULL,
162  PRIMARY KEY(serviceid),
163  UNIQUE INDEX unique_services_index(Name(20),Server(30),LockID(10))
164)
165TYPE=InnoDB;
166    },
167    Count => q{
168CREATE TABLE Count (
169  countid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
170  serviceid INTEGER UNSIGNED NOT NULL,
171  YearMonth VARCHAR(6) NOT NULL,
172  afpjobs_nb INTEGER UNSIGNED NOT NULL,
173  afpjobs_tps FLOAT NOT NULL,
174  afpjobs_err INTEGER UNSIGNED NOT NULL,
175  jobs_nb INTEGER UNSIGNED NOT NULL,
176  jobs_err INTEGER UNSIGNED NOT NULL,
177  pages_nb INTEGER UNSIGNED NOT NULL,
178  PRIMARY KEY(countid),
179  UNIQUE INDEX unique_counts_index(serviceid,YearMonth)
180)
181TYPE=InnoDB;
182    },
183    Destinations => q{
184CREATE TABLE Destinations (
185  destid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
186  DestIdName TEXT NOT NULL,
187  FirstTime DATE NOT NULL,
188  LastTime DATE NOT NULL,
189  Count INTEGER UNSIGNED NOT NULL,
190  PRIMARY KEY(destid),
191  UNIQUE INDEX unique_destids_index(DestIdName(8))
192)
193TYPE=InnoDB;
194    },
195    Documents => q{
196CREATE TABLE Documents (
197  docid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
198  PageDef TEXT NOT NULL,
199  FormDef TEXT NOT NULL,
200  DocName TEXT NOT NULL,
201  PageFormats TEXT NOT NULL,
202  CopyGroups TEXT NOT NULL,
203  Logos TEXT NOT NULL,
204  FirstTime DATE NOT NULL,
205  LastTime DATE NOT NULL,
206  Count INTEGER UNSIGNED NOT NULL,
207  PRIMARY KEY(docid),
208  INDEX documents_index(PageDef(6),FormDef(6),DocName(4))
209)
210TYPE=InnoDB;
211    },
212    DocDestCount => q{
213CREATE TABLE DocDestCount (
214  ddid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
215  destid INTEGER UNSIGNED NOT NULL,
216  docid INTEGER UNSIGNED NOT NULL,
217  FirstTime DATE NOT NULL,
218  LastTime DATE NOT NULL,
219  Count INTEGER UNSIGNED NOT NULL,
220  PRIMARY KEY(ddid),
221  UNIQUE INDEX ddids_index(destid,docid)
222)
223TYPE=InnoDB;
224    }
225);
226
227my $Server ;
228if ( -e '/proc/sys/kernel/hostname' ) {
229    open *HOSTNAME , '<', "/proc/sys/kernel/hostname"
230        or die "Can't read hostname from kernel: $!";
231    $Server = <HOSTNAME> ;
232    close HOSTNAME ;
233} else {
234    $Server = qx/hostname -s/ ;
235}
236$Server = 'unknown' unless (defined($Server) and $Server);
237chomp $Server ;
238
239# String set in &stats_set_service, used in &set_serviceid_from
240my $service = "" ;
241my $serviceid = 0 ;       # Value set in &set_serviceid_from
242my $countid = 0 ;         # Value set in &set_countid
243my $afpjid = 0 ;          # Value set in &set_afpjid_from
244my $jid = 0 ;             # Value set in &set_jid_from
245my $destid = 0 ;          # Value set in &set_destid_from
246my $docid = 0 ;           # Value set in &set_docid
247my $ddid = 0 ;            # Value set in &set_ddid
248my $actual_commit = 0 ;   # Commit counter
249my $commit_timer  = [ &gettimeofday() ] ; # Commit timer
250my @commit_timers = () ;  # Commit times to check timer adjustement
251my $max_timer = 10 ;      # 10 seconds is the max timer, will be updated
252                          # as needed but not other 10s
253
254# Set to something returning 1 only for debugging during development
255sub __A2P_DB_DEBUG__ { 0 }
256
257################################################################################
258### Common API
259################################################################################
260
261my $connected_timer = [ &gettimeofday() ] ;
262sub a2p_db_connect {
263    if ($RECONNECT_TIMER) {
264        # Force a disconnection from DB
265        if (&tv_interval($connected_timer) >= $RECONNECT_TIMER) {
266            &UPSTAT('A2P-DB-RECONNECT-TIMEOUT');
267            $connected_timer = [ &gettimeofday() ] ;
268            &a2p_db_disconnect();
269        }
270    }
271
272    # Get a connection to DB
273    $DBH = DBI->connect_cached(
274        $DBI_DSN,
275        $DBI_USER ? $DBI_USER : $Progname ,
276        $DBI_PASSWD,
277        {
278            PrintError => 1,  RaiseError => 0,  AutoCommit => 0,
279            __statistics__ => __FILE__.__LINE__,
280            Profile => 0
281            # Can be 2, 4 or 6 to profile SQL requests (see man DBI::Profile)
282        })
283        or return &Error("Can't connect to '$DBI_DSN': " . $DBI::errstr);
284    &UPSTAT('A2P-DB-CONNECT-CACHED');
285
286    return defined($DBH) ? $DBH : 0 ;
287}
288
289sub a2p_db_connected {
290    # Check we are still connected to DB
291    my $connected = (defined($DBH) and $DBH) ? 1 : 0 ;
292    &UPSTAT('A2P-DB-CONNECTED-CHECK-'.$connected);
293
294    # Force a reconnection to DB if RECONNECT_TIMER is set and delay is passed
295    $connected = &a2p_db_connect()
296        if ($RECONNECT_TIMER
297        and &tv_interval($connected_timer) >= $RECONNECT_TIMER);
298
299    # Disconnect on error detection
300    if ( $connected and defined($DBH->err()) and $DBH->err() ) {
301        &Error("Disconnecting on DBI error detection: " . $DBH->errstr());
302        $connected = 0 ;
303    }
304
305    # Check the ping status a try reconnect is False
306    unless ( $connected and $DBH->ping ) {
307        &Info("DB Disconnection detected, reconnecting...") if $connected ;
308        &a2p_db_disconnect();
309        $connected = &a2p_db_connect();
310    }
311
312    return $connected ;
313}
314
315sub a2p_db_disconnect {
316    my @caller = caller ;
317    local $" = '-' ;
318    $caller[1] =~ s/.*\/// ;
319    &UPSTAT("A2P-DB-DISCONNECT-@caller");
320
321    if (defined($DBH)) {
322        # Check if profile was enabled and then dump as Debug
323        if (defined($DBH->{Profile}) and $DBH->{Profile}) {
324            &Debug("DB profiling: " . $DBH->{Profile}->format);
325        }
326
327        if ($actual_commit) {
328            # Force commit immediatly before disconnection
329            $actual_commit = $STATS_COMMIT_SKIP ;
330            &commit();
331            usleep 10000 ;
332        }
333
334        # Disconnect DB if present
335        $DBH->disconnect();
336        &UPSTAT('A2P-DB-DISCONNECTED');
337
338        # Erase DBH handler
339        undef $DBH ;
340
341        # Erase Some related variables
342        map { delete $TABLES{$_} } keys(%TABLES) ;
343    }
344}
345
346
347################################################################################
348### Private API
349################################################################################
350
351sub CheckTable {
352    my $name = shift ;
353    my $sub  = shift ;
354
355    return &Error("Can't check not specified table")
356        unless (defined($name) and $name);
357
358    return &Error("Not connected to DB") unless (&a2p_db_connected);
359
360    return $TABLES{$name} if (defined($TABLES{$name}) and $TABLES{$name});
361
362    #&Debug("Checking table '$name' exitence");
363    if (!defined($TABLES{$name})
364    and $DBH->do("SHOW TABLES LIKE '$A2PDB_TABLE_PREFIX$name'")>0) {
365        $TABLES{$name} = 1 ;
366
367    } else {
368        &Debug("Table '$name' seems not exists: ".$DBH->errstr());
369        $TABLES{$name} = 0 ;
370
371        # Only one recursive call is possible
372        unless (@_) {
373            my $statement ;
374            my $table_name = defined($sub) ? &$sub($name) : $name ;
375
376            if (defined($CREATE_TABLE{$table_name})) {
377                $statement = $CREATE_TABLE{$table_name} ;
378
379            } else {
380                return &Error("Table '$table_name' creation not supported");
381            }
382
383            # Update to create table if not exists
384            $statement =~
385                s/CREATE\s+TABLE\s+$table_name/CREATE TABLE IF NOT EXISTS $A2PDB_TABLE_PREFIX$name/mi ;
386
387            # Also update indexes names when a prefix is set
388            $statement =~ s/^(\s*)INDEX\s+/INDEX $A2PDB_TABLE_PREFIX/mig
389                if ($A2PDB_TABLE_PREFIX);
390
391            &Debug("Try to create table with $statement") if __A2P_DB_DEBUG__ ;
392
393            $DBH->do($statement)
394                or return &Error("Can't create '$name' table: ".$DBH->errstr());
395
396            # Force commit on creation
397            $DBH->commit()
398                 or return &Error("Bad commit: ".$DBH->errstr());
399
400            my $check = "SHOW TABLES LIKE '$A2PDB_TABLE_PREFIX$name'" ;
401            $TABLES{$name} = ($DBH->do($check) > 0) ?
402                (&Debug("Table '$name' available"),1)
403                :
404                &Error("Table '$name' not created: ".$DBH->errstr());
405        }
406    }
407
408    return $TABLES{$name} ;
409}
410
411my $cache_timer = [ &gettimeofday() ];
412
413sub check_commit {
414    # Add a call to this sub in a loop to force commit after max_timer seconds
415
416    # Check cache each minute to definitively remove value older than 10 minutes
417    if ( &tv_interval($cache_timer) > 60
418    and defined($CACHE->{'__expiration_list'})
419    and @{$CACHE->{'__expiration_list'}}) {
420        my @temp = () ;
421
422        # Don't pass more than one second to purge the cache
423        my $timeout_timer = [ &gettimeofday() ];
424        while ( @{$CACHE->{'__expiration_list'}}
425        and &tv_interval($timeout_timer) < 1 ) {
426            my $oldwhat = shift @{$CACHE->{'__expiration_list'}} ;
427            my $value_ref = &recursive_ref($CACHE,@{$oldwhat});
428            if (ref($value_ref) ne 'ARRAY') {
429                &Warn("(@{$oldwhat}) gives no valid value from cache, " .
430                    "cache may be corrupted");
431                next ;
432
433            } elsif (&tv_interval($value_ref->[1])<600) {
434                # Keep too young values
435                push @temp, $oldwhat ;
436                next ;
437            }
438
439            # Here this is a value to remove from cache
440            while (@{$oldwhat}) {
441                my $key = pop @{$oldwhat} ;
442                my $ref = &recursive_ref($CACHE,@{$oldwhat})
443                    or last;
444                if (ref($ref) =~ /^HASH/) {
445                    delete $ref->{$key} ;
446                    # Don't delete hash ref if other value are cached here
447                    last if (keys(%{$ref}));
448                }
449            }
450        }
451
452        # Update expiration list with not expired kept list
453        if (@{$CACHE->{'__expiration_list'}}) {
454            push @{$CACHE->{'__expiration_list'}}, @temp ;
455
456        } else {
457            $CACHE->{'__expiration_list'} = \@temp ;
458        }
459
460        # Reset cache timer
461        $cache_timer = [ &gettimeofday() ];
462    }
463
464    return 1
465        unless ( $actual_commit and &tv_interval($commit_timer) > $max_timer );
466
467    # Check to adapt futur timer check, reduce max_timer, it will be added
468    # with 1 second after the commit call, so it will be corrected just after
469    my $reduce_max_timer = (@commit_timers < $STATS_COMMIT_SKIP>>1) ;
470    # Only do a correction if not enough commit has occured, here not enought
471    # means commits rate is lower than half of expected rate
472
473    # Keep only timers lower than current max_timer interval
474    while (@commit_timers and &tv_interval($commit_timers[0]) > $max_timer ) {
475        shift @commit_timers ;
476    }
477
478    # Reset timer for next commit check
479    $commit_timer = [ &gettimeofday() ] ;
480
481    $actual_commit = $STATS_COMMIT_SKIP ;
482    &commit(); # max_timer is added with one second if lower than 10
483
484    # Then apply correction (so will be -2 first time, than -1)
485    $max_timer -= ($max_timer>2 and $reduce_max_timer) ? 2 : 0 ;
486}
487
488sub commit {
489    # Do nothing if AutoCommit if activated
490    return defined($actual_commit = 0)
491        if (defined($DBH->{AutoCommit}) and $DBH->{AutoCommit});
492
493    # Keep call time
494    push @commit_timers, [ &gettimeofday() ] ;
495
496    return 1
497        unless ( ++ $actual_commit > $STATS_COMMIT_SKIP );
498
499    # Update max_timer if still not to 10 seconds
500    $max_timer ++ if ( $max_timer < 10 );
501
502    $actual_commit = 0 ;
503    $DBH->commit()
504        or return &Error("Bad commit: ".$DBH->errstr());
505
506    1 ;
507}
508
509sub get_table_hash {
510    my $table = shift ;
511    my $key   = shift ;
512
513    my $internal_stat_key = 'DB-GET-TABLE-HASH-' . uc($table) ;
514    &TIMESTAT($internal_stat_key); &UPSTAT($internal_stat_key);
515
516    return &Error("Can't get not specified table")
517        unless (defined($table) and $table);
518
519    return &Error("Can't get hash with unspecified key")
520        unless (defined($key) and $key);
521
522    unless (exists($TABLES{$table})) {
523        &Debug("Force checking '$table' table is available");
524        &CheckTable($table);
525    }
526
527    return &Error("Can't get hash for unknown table '$table'")
528        unless (defined($TABLES{$table}) and $TABLES{$table});
529
530    my $sth = $DBH->prepare_cached("SELECT * FROM $A2PDB_TABLE_PREFIX$table")
531        or return &Error("Can't prepare request to get '$table' table: " .
532            $DBH->errstr());
533
534    $sth->execute()
535        or return &Error("Can't get '$table' table: ".$DBH->errstr());
536
537    my $hashref ;
538    # Check if we can call fetchall_hashref with an ARRAY ref
539    if ( ref($key) =~ /^ARRAY/ and $DBI::VERSION < 1.48 ) {
540        # Simulate the same API than later DBI version
541        my $arrayref = $sth->fetchall_arrayref({})
542            or return &Error("Can't get '$table' table as array of hash: " .
543            $DBH->errstr());
544
545        $hashref = {} ;
546        while (@{$arrayref}) {
547            my $row = shift @{$arrayref} ;
548
549            my $newref = $hashref ;
550            my @keys = @{$key} ;
551            while (@keys) {
552                my $nextkey = shift @keys ;
553                my $this = $row->{$nextkey} ;
554                # Finally affect the row to the last ref
555                $newref->{$this} = @keys ? {} : $row
556                    unless (exists($newref->{$this}));
557                $newref = $newref->{$this} ;
558            }
559        }
560
561    } else {
562        $hashref = $sth->fetchall_hashref($key)
563            or return &Error("Can't get '$table' table as hash: " .
564            $DBH->errstr());
565    }
566
567    $sth->finish ;
568
569    &MAXSTAT($internal_stat_key.'-KEYCOUNT',scalar(keys(%{$hashref})));
570    &TIMESTAT($internal_stat_key);
571
572    return $hashref ;
573}
574
575
576################################################################################
577### Statistics API
578################################################################################
579my $Statistics_ready = undef ;
580sub a2p_db_statistics_ready {
581    return $Statistics_ready if (defined($Statistics_ready));
582
583    # Check needed tables are availables
584    foreach my $table (keys(%CREATE_TABLE)) {
585        next if ($table =~ /jobs/);
586        &CheckTable($table)
587            or return $Statistics_ready = 0 ;
588    }
589
590    return $Statistics_ready = 1 ;
591}
592
593sub CheckJobsTables {
594    my $date = shift || '' ;
595    ( $MONTH ) = $date =~ /^(\d{6})/ ;
596
597    &CheckTable('afpjobs_'.$MONTH,\&job_tablename_check)
598        or return &Error("No table available for afpjobs_$MONTH month");
599
600    &CheckTable('jobs_'.$MONTH,\&job_tablename_check)
601        or return &Error("No table available for jobs_$MONTH month");
602}
603
604sub job_tablename_check {
605    my $name = shift || '' ;
606    &Debug("Checking table name '$name'");
607    return $name =~ /^(\w*jobs_)/ ? $1 . 'YYYYMM' : $name ;
608}
609
610sub stats_set_service {
611    $service = shift || 'unknown' ;
612}
613
614sub mysql_insert_id {
615    my $select = $DBH->prepare_cached(qq{
616        SELECT LAST_INSERT_ID()
617    });
618
619    $select->execute()
620        or return &Error("Can't get last inserted id: ".$DBH->errstr());
621
622    my ( $last_id ) = $select->fetchrow_array ;
623    $select->finish ;
624
625    return $last_id ? $last_id : &Error("Can't get valid last inserted id") ;
626}
627
628sub recursive_ref {
629    my $ref = shift ;
630    return $ref unless @_ ;
631    my $key = shift ;
632    unless (ref($ref) =~ /^HASH/i and exists($ref->{$key})) {
633        &Debug("No value referenced as $ref -> $key") if __A2P_DB_DEBUG__ ;
634        return 0 ;
635    }
636    return &recursive_ref( $ref->{$key}, @_ );
637}
638
639sub cached {
640    my $what  = shift || [] ;
641    my $value = undef ;
642
643    return $value unless ( @{$what} > 1 ) ;
644
645    if (@_) {
646        my $which_list = ($what->[0]).'__list' ;
647        $value = shift ;
648
649        # Set cache for a value
650        push @{$CACHE->{$which_list}}, [ @{$what} ] ;
651
652        # Push old values in a list with expiration checked later
653        if (@{$CACHE->{$which_list}} > $MAX_CACHED_DB) {
654            my $oldwhat = shift @{$CACHE->{$which_list}} ;
655            push @{$CACHE->{'__expiration_list'}}, $oldwhat ;
656        }
657
658        # Prepare hash tree cache
659        &Debug("Caching $value as ".join('=>',@{$what})) if __A2P_DB_DEBUG__ ;
660        my $whatref = $CACHE ;
661        my $whatkey = shift @{$what} ;
662        while (@{$what}) {
663            unless (defined($whatref->{$whatkey})) {
664                $whatref->{$whatkey} = {} ;
665                &Debug("Cache hash $whatref -> $whatkey initialized")
666                    if __A2P_DB_DEBUG__ ;
667            }
668            $whatref = $whatref->{$whatkey} ;
669            $whatkey = shift @{$what} ;
670        }
671
672        # Set cache and return now to not update cache statistics
673        $whatref->{$whatkey} = [ $value , [ &gettimeofday() ]];
674        &Debug("$value cache as $whatref -> $whatkey") if __A2P_DB_DEBUG__ ;
675
676    } elsif (defined($CACHE)) {
677        my $value_ref = &recursive_ref($CACHE,@{$what});
678        if ($value_ref and ref($value_ref) =~ /^ARRAY/) {
679            $value = $value_ref->[0] ;
680            # Update value timer
681            $value_ref->[1] = [ &gettimeofday() ] ;
682        }
683
684        # Update internal hit/miss cache statistics
685        defined($value) ?
686            &UPSTAT('CACHE-HIT--'.($what->[0]))
687            :
688            &UPSTAT('CACHE-MISS-'.($what->[0])) ;
689    }
690
691    return $value ;
692}
693
694sub set_afpjid_from {
695    my $JobID = shift ;
696
697    return &Error("No JobID provided") unless $JobID ;
698
699    &Warn("Trying to set afpjid for $JobID and $MONTH without serviceid set")
700        unless ($serviceid);
701
702    my $cache_key_list = [ 'afpjid', $MONTH, $serviceid, $JobID ] ;
703
704    # Return cached value
705    my $cached = &cached($cache_key_list) ;
706    return $afpjid = $cached if (defined($cached) and $cached);
707
708    my $select = $DBH->prepare_cached(qq{
709        SELECT afpjid FROM ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH
710            WHERE JobID=? and serviceid=?
711        });
712
713    $select->execute($JobID,$serviceid)
714        or return &Error("Can't select afpjid from '$JobID' JobID " .
715            "in afpjobs table: " . $DBH->errstr());
716
717    ( $afpjid ) = $select->fetchrow_array ;
718    $select->finish ;
719
720    unless (defined($afpjid) and $afpjid) {
721        # Insert a new row for this job
722        my $insert = $DBH->prepare_cached(qq{
723            INSERT INTO ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH
724            (JobID,serviceid,Date,DayTime,State,Errors,NBJobs,Chrono,Counted)
725            VALUES(?,?,?,?,'  ',0,0,0,0)
726        });
727
728        $insert->execute($JobID,$serviceid,$JOB->[0],$JOB->[1])
729            or return &Error("Can't insert '$JobID' JobID in afpjobs table: " .
730                $DBH->errstr());
731
732        $afpjid = &mysql_insert_id() if (&commit());
733    }
734
735    return $afpjid = &Error("Can't get a valid afpjid")
736        unless (defined($afpjid) and $afpjid);
737
738    &Debug("Set afpjid to $afpjid") if (__A2P_DB_DEBUG__ and defined($afpjid));
739
740    return &cached($cache_key_list, $afpjid);
741}
742
743sub set_countid {
744
745    return &Error("Can't set new counter without valid serviceid")
746        unless (defined($serviceid) and $serviceid);
747
748    my $cache_key_list = [ 'countid', $MONTH, $serviceid ] ;
749
750    # Return cached value
751    my $cached = &cached($cache_key_list) ;
752    return $countid = $cached if (defined($cached) and $cached);
753
754    # Search for countid
755    my $select = $DBH->prepare_cached(qq{
756        SELECT countid FROM ${A2PDB_TABLE_PREFIX}Count
757        WHERE serviceid=? and YearMonth=?
758        });
759
760    $select->execute($serviceid,$MONTH)
761        or return &Error("Can't select countid in Count table: " .
762            $DBH->errstr());
763
764    ( $countid ) = $select->fetchrow_array ;
765    $select->finish ;
766
767    unless ((defined($countid) and $countid) or @_) {
768        # Insert a new row for this service and month
769        my $insert = $DBH->prepare_cached(qq{
770            INSERT INTO ${A2PDB_TABLE_PREFIX}Count
771                (serviceid,YearMonth,afpjobs_nb,afpjobs_tps,afpjobs_err,
772                jobs_nb,jobs_err,pages_nb)
773                VALUES(?,?,0,0,0,0,0,0)
774        });
775
776        $insert->execute($serviceid,$MONTH)
777            or return &Error("Can't insert counter in Count table: " .
778                $DBH->errstr());
779
780        $countid = &mysql_insert_id() if (&commit());
781    }
782
783    return $countid = &Error("Can't get a valid countid")
784        unless (defined($countid) and $countid);
785
786    &Debug("Set countid to '$countid'") if __A2P_DB_DEBUG__ ;
787    return &cached($cache_key_list, $countid) ;
788}
789
790sub set_ddid {
791
792    my $cache_key_list = [ 'ddid', $destid, $docid ] ;
793
794    # Return cached value
795    my $cached = &cached($cache_key_list) ;
796    return $ddid = $cached if (defined($cached) and $cached);
797
798    # Search for ddid
799    my $select = $DBH->prepare_cached(qq{
800        SELECT ddid FROM ${A2PDB_TABLE_PREFIX}DocDestCount
801        WHERE destid=? AND docid=?
802        });
803
804    $select->execute($destid,$docid)
805        or return &Error("Can't select ddid in DocDestCount table: " .
806            $DBH->errstr());
807
808    ( $ddid ) = $select->fetchrow_array ;
809    $select->finish ;
810
811    unless (defined($ddid) and $ddid) {
812        # Insert a new row for this document
813        my $insert = $DBH->prepare_cached(qq{
814            INSERT INTO ${A2PDB_TABLE_PREFIX}DocDestCount
815                (destid,docid,FirstTime,LastTime,Count)
816                VALUES(?,?,?,?,0)
817        });
818
819        $insert->execute($destid,$docid,$JOB->[0],$JOB->[0])
820            or return &Error("Can't insert docdest counter " .
821                "in DocDestCount table: " . $DBH->errstr());
822
823        $ddid = &mysql_insert_id() if (&commit());
824    }
825
826    return $ddid = &Error("Can't get a valid ddid")
827        unless (defined($ddid) and $ddid);
828
829    &Debug("Set ddid to '$ddid'") if __A2P_DB_DEBUG__ ;
830    return &cached($cache_key_list, $ddid) ;
831}
832
833sub set_destid_from {
834    my $DestID = shift || '(none)' ;
835    $DestID = '(NULL)' if ($DestID =~ /^NULL$/i);
836    $DestID = '(none)' if ($DestID =~ /^\s*$/);
837
838    my $cache_key_list = [ 'destid', $DestID ] ;
839
840    # Return cached value
841    my $cached = &cached($cache_key_list) ;
842    return $destid = $cached if (defined($cached) and $cached);
843
844    # Get serviceid from table or insert a new one
845    my $destinations = &get_table_hash('Destinations','DestIdName');
846
847    return &Error("Can't get Destinations hash")
848        unless (defined($destinations) and $destinations);
849
850    $destid = $destinations->{$DestID}->{'destid'}
851        if (defined($destinations->{$DestID}));
852
853    unless (defined($destid) and $destid) {
854        # Here we need to insert a new service and get the new serviceid
855        &Debug("Inserting new DestID '$DestID' after last '$destid' one")
856            if __A2P_DB_DEBUG__ ;
857
858        my $insert = $DBH->prepare_cached(qq{
859            INSERT INTO ${A2PDB_TABLE_PREFIX}Destinations
860            (DestIdName, FirstTime, LastTime, Count)
861            VALUES(?,?,?,0)
862            });
863
864        $insert->execute($DestID,$JOB->[0],$JOB->[0])
865            or return &Error("Can't insert '$DestID,$JOB->[0],$JOB->[0],0' " .
866                "in Destinations table: ".$DBH->errstr());
867
868        # Get last inserted id
869        $destid = &mysql_insert_id() if (&commit());
870    }
871
872    return $destid = &Error("Can't get a valid destid")
873        unless (defined($destid) and $destid);
874
875    &Debug("Last inserted destination has '$destid' id") if __A2P_DB_DEBUG__ ;
876    return &cached($cache_key_list, $destid );
877}
878
879sub set_docid {
880    # Set cache list avoiding empty keys
881    my $cache_key_list = [
882        'docid', @{$JOB}[5..7], map { $_.'_' } @{$JOB}[8..10]
883        ] ;
884
885    # Return cached value
886    my $cached = &cached($cache_key_list) ;
887    return $docid = $cached if (defined($cached) and $cached);
888
889    # Search for docid
890    my $select = $DBH->prepare_cached(qq{
891        SELECT docid FROM ${A2PDB_TABLE_PREFIX}Documents
892        WHERE PageDef=? AND FormDef=? AND DocName=? AND PageFormats=?
893            AND CopyGroups=? AND Logos=?
894        });
895
896    $select->execute(@{$JOB}[5..10])
897        or return &Error("Can't select docid in Documents table: " .
898            $DBH->errstr());
899
900    ( $docid ) = $select->fetchrow_array ;
901    $select->finish ;
902
903    unless (defined($docid) and $docid) {
904        # Insert a new row for this document
905        my $insert = $DBH->prepare_cached(qq{
906            INSERT INTO ${A2PDB_TABLE_PREFIX}Documents
907            (PageDef,FormDef,DocName,PageFormats,CopyGroups,
908            Logos,FirstTime,LastTime,Count)
909            VALUES(?,?,?,?,?,?,?,?,0)
910        });
911
912        $insert->execute(@{$JOB}[5..10],$JOB->[0],$JOB->[0])
913            or return &Error("Can't insert document in Documents table: " .
914                $DBH->errstr());
915
916        # Get last inserted id
917        $docid = &mysql_insert_id() if (&commit());
918    }
919
920    return $docid = &Error("Can't get a valid docid")
921        unless (defined($docid) and $docid);
922
923    &Debug("Set docid to '$docid'") if __A2P_DB_DEBUG__ ;
924    return &cached($cache_key_list, $docid);
925}
926
927sub set_jid_from {
928    my $Number = shift || 0 ;
929
930    my $cache_key_list = [ 'jid', $MONTH, $afpjid, $Number ] ;
931
932    # Return cached value
933    my $cached = &cached($cache_key_list) ;
934    return $jid = $cached if (defined($cached) and $cached);
935
936    my $select = $DBH->prepare_cached(qq{
937        SELECT jid FROM ${A2PDB_TABLE_PREFIX}jobs_$MONTH
938        WHERE afpjid=? and Number=?
939        });
940
941    $select->execute($afpjid,$Number)
942        or return &Error("Can't select jid from '$Number' Number " .
943            " in jobs table: " . $DBH->errstr());
944
945    ( $jid ) = $select->fetchrow_array ;
946    $select->finish ;
947
948    unless (defined($jid) and $jid) {
949        # Insert a new row for this job
950        my $insert = $DBH->prepare_cached(qq{
951            INSERT INTO ${A2PDB_TABLE_PREFIX}jobs_$MONTH
952                (afpjid,Number,destid,docid,
953                NbPages,State,Timing,Chrono,Kind,Errors,Counted,Jobname)
954                VALUES(?,?,?,0,0,'  ',0,0,' ',0,0,'')
955        });
956
957        $insert->execute($afpjid,$Number,$destid)
958            or return &Error("Can't insert job '$Number' for '$afpjid' afpjob "
959                . " in jobs table: " . $DBH->errstr());
960
961        $jid = &mysql_insert_id() if (&commit());
962    }
963
964    return $jid = &Error("Can't get a valid jid")
965        unless (defined($jid) and $jid);
966
967    &Debug("Set jid to '$jid'") if (__A2P_DB_DEBUG__);
968    return &cached($cache_key_list, $jid);
969}
970
971sub set_serviceid_from {
972    my $LockID = shift || 'unknown' ;
973
974    my $cache_key_list = [ 'serviceid', $Server, $service, $LockID ] ;
975
976    # Return cached value
977    my $cached = &cached($cache_key_list) ;
978    return $serviceid = $cached if (defined($cached) and $cached);
979
980    &Debug("Services table not cached") if __A2P_DB_DEBUG__ ;
981
982    # Get serviceid from table or insert a new one
983    my $Services = &get_table_hash('Services','serviceid')
984        or return &Error("Can't get Services hash");
985
986    # Parse DB table returned as hash
987    my $found = 0 ;
988    foreach my $id (keys(%{$Services})) {
989        next unless ($Services->{$id}->{Name}   =~ /^$service$/ );
990        next unless ($Services->{$id}->{Server} =~ /^$Server$/i );
991        next unless ($Services->{$id}->{LockID} =~ /^$LockID$/  );
992        &Debug("Extracted serviceid '$id' from Services table")
993            if __A2P_DB_DEBUG__ ;
994        $found ++ ;
995        $serviceid = $id ;
996        last ;
997    }
998
999    unless ($found and $serviceid) {
1000        # Here we need to insert a new service and get the new serviceid
1001        &Debug("Inserting new service '$service,$Server,$LockID' after last "
1002            . "'$serviceid' one") if __A2P_DB_DEBUG__ ;
1003
1004        my $insert = $DBH->prepare_cached(qq{
1005            INSERT INTO ${A2PDB_TABLE_PREFIX}Services
1006            (Name, Server, LockID)
1007            VALUES(?,?,?)
1008            });
1009
1010        $insert->execute($service,$Server,$LockID)
1011            or return &Error("Can't insert '$service,$Server,$LockID' " .
1012                "in Services table: " . $DBH->errstr());
1013
1014        # Get last inserted id
1015        $serviceid = &mysql_insert_id() if (&commit());
1016
1017        &Debug("Last inserted service has $serviceid id") if __A2P_DB_DEBUG__ ;
1018    }
1019
1020    return $serviceid = &Error("Can't get a valid serviceid")
1021        unless (defined($serviceid) and $serviceid);
1022
1023    &Debug("Keeping $serviceid id in cache") if __A2P_DB_DEBUG__ ;
1024    $cached = &cached($cache_key_list, $serviceid);
1025    &Debug("$cached value kept in cache") if __A2P_DB_DEBUG__ ;
1026    return $cached ;
1027}
1028
1029sub stats_converter {
1030    $JOB = shift ;
1031
1032    return &Error("Converter line: Not connected to DB")
1033        unless (&a2p_db_connected());
1034
1035    # Reset global value for that line
1036    ( $afpjid , $jid , $countid, $serviceid, $destid ) = ( 0, 0, 0, 0, 0 );
1037    ( $docid , $ddid ) = ( 0, 0 );
1038
1039    # 1. Check tables for job are availables
1040    &CheckJobsTables($JOB->[0])
1041        or return &Error("Converter line: " .
1042            "Tables not available for $JOB->[0] date");
1043
1044    # 2. Update serviceid as we known LOCKID
1045    &set_serviceid_from($JOB->[13])
1046        or return &Error("Converter line: Can't find DB serviceid");
1047
1048    # 3. Set countid
1049    &set_countid()
1050        or return &Error("Converter line: Can't find DB countid");
1051
1052    # 4. Set afpjid
1053    &set_afpjid_from($JOB->[2])
1054        or return &Error("Converter line: Can't find DB afpjid");
1055
1056    # 7. Check if it has been counted
1057    my $select = $DBH->prepare_cached(qq{
1058        SELECT Counted FROM ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH
1059        WHERE afpjid=?
1060        });
1061
1062    $select->execute($afpjid)
1063        or return &Error("Can't select job row from '$jid' jid in jobs table: "
1064            . $DBH->errstr());
1065
1066    my ( $afpjid_counted ) = $select->fetchrow_array ;
1067    $select->finish ;
1068
1069    # 5. Set destid
1070    &set_destid_from( $JOB->[4] )
1071        or return &Error("Converter line: Can't find DB destid");
1072
1073    # 6. Set jid
1074    my ( $number ) = $JOB->[3] =~ /^.*-AFP(\d+)$/ ;
1075    &set_jid_from( $number )
1076        or return &Error("Converter line: Can't find DB jid");
1077
1078    # 7. Check if it has been counted
1079    $select = $DBH->prepare_cached(qq{
1080        SELECT Counted FROM ${A2PDB_TABLE_PREFIX}jobs_$MONTH
1081        WHERE jid=?
1082        });
1083
1084    $select->execute($jid)
1085        or return &Error("Can't select job row from '$jid' jid in jobs table: "
1086            . $DBH->errstr());
1087
1088    my ( $counted ) = $select->fetchrow_array ;
1089    $select->finish ;
1090
1091    # 8. Still return if it still has been counted
1092    return $counted if ($counted & 1);
1093
1094    # 9. Set docid
1095    &set_docid()
1096        or return &Error("Converter line: Can't find DB docid");
1097
1098    # 10. Set ddid
1099    &set_ddid()
1100        or return &Error("Converter line: Can't find DB ddid");
1101
1102    # 11. Set session variable
1103    my $settime = $DBH->prepare_cached(q{
1104        SET @THISTIME = ?, @THISHOUR = ?
1105        });
1106    $settime->execute($JOB->[0],$JOB->[1])
1107        or return &Error("Converter line: " .
1108            "Can't set THISTIME to '$JOB->[0]': " .
1109            $DBH->errstr());
1110
1111    # 12. Update row in jobs table
1112    my $update = $DBH->prepare_cached(qq{
1113        UPDATE ${A2PDB_TABLE_PREFIX}jobs_$MONTH
1114        SET docid=?, Jobname=?, NbPages=?, Kind=?, Counted=Counted | 1
1115        WHERE jid=?
1116        });
1117
1118    $update->execute($docid,$JOB->[3],$JOB->[11],$JOB->[12],$jid)
1119        or return &Error("Converter line: " .
1120            "Can't update row '$jid' in jobs_$MONTH table: " .
1121            $DBH->errstr());
1122
1123    # 13. At the same time, update jobs_nb and pages_nb
1124    if ($JOB->[11] =~ /^\d+$/) {
1125        $update = $DBH->prepare_cached(qq{
1126            UPDATE ${A2PDB_TABLE_PREFIX}Count
1127            SET afpjobs_nb=afpjobs_nb+?, jobs_nb=jobs_nb+1,
1128                pages_nb=pages_nb+$JOB->[11]
1129            WHERE countid=?
1130            });
1131
1132        $update->execute( $afpjid_counted ? 0 : 1 , $countid )
1133            or return &Error("Converter line: " .
1134                "Can't update $countid row in Count table: " .
1135                $DBH->errstr());
1136    }
1137
1138    # 14. Update also Destination counter
1139    $update = $DBH->prepare_cached(qq{
1140        UPDATE ${A2PDB_TABLE_PREFIX}Destinations
1141        SET Count=Count+1,
1142            FirstTime=IF(FirstTime<=\@THISTIME,FirstTime,\@THISTIME),
1143            LastTime=IF(LastTime>=\@THISTIME,LastTime,\@THISTIME)
1144        WHERE destid=? ;
1145        });
1146
1147    $update->execute($destid)
1148        or return &Error("Converter line: " .
1149            "Can't update $destid row in Destinations table: " .
1150            $DBH->errstr());
1151
1152    # 15. Update also Document counter
1153    $update = $DBH->prepare_cached(qq{
1154        UPDATE ${A2PDB_TABLE_PREFIX}Documents
1155        SET Count=Count+1,
1156            FirstTime=IF(FirstTime<=\@THISTIME,FirstTime,\@THISTIME),
1157            LastTime=IF(LastTime>=\@THISTIME,LastTime,\@THISTIME)
1158        WHERE docid=?
1159        });
1160
1161    $update->execute($docid)
1162        or return &Error("Converter line: " .
1163            "Can't update $docid row in Documents table: " .
1164            $DBH->errstr());
1165
1166    # 16. Update also DocDest counter
1167    $update = $DBH->prepare_cached(qq{
1168        UPDATE ${A2PDB_TABLE_PREFIX}DocDestCount
1169        SET Count=Count+1,
1170            FirstTime=IF(FirstTime<=\@THISTIME,FirstTime,\@THISTIME),
1171            LastTime=IF(LastTime>=\@THISTIME,LastTime,\@THISTIME)
1172        WHERE ddid=?
1173        });
1174
1175    $update->execute($ddid)
1176        or return &Error("Converter line: " .
1177            "Can't update $ddid row in DocDestCount table: " .
1178            $DBH->errstr());
1179
1180    # 17. Update also NBJobs counter in afpjobs
1181    $update = $DBH->prepare_cached(qq{
1182        UPDATE ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH
1183        SET NBJobs=NBJobs+1, Counted=Counted | 1,
1184            Date=IF(Date<=\@THISTIME,Date,\@THISTIME),
1185            DayTime=IF(DayTime<=\@THISHOUR,DayTime,\@THISHOUR)
1186        WHERE afpjid=?
1187        });
1188
1189    $update->execute($afpjid)
1190        or return &Error("Converter line: " .
1191            "Can't update $afpjid row in afpjobs_$MONTH table: " .
1192            $DBH->errstr());
1193
1194    # Finally. Commit changes and return commit success
1195    return &commit() ;
1196}
1197
1198sub stats_jobmanager {
1199    $JOB = shift ;
1200
1201    return &Error("JobManager line: Not connected to DB")
1202        unless (&a2p_db_connected());
1203
1204    ( $afpjid , $jid , $countid, $serviceid, $destid ) = ( 0, 0, 0, 0, 0 );
1205
1206    # 1. Check tables for job are availables
1207    &CheckJobsTables($JOB->[0])
1208        or return &Error("JobManager line: Tables not available for " .
1209             $JOB->[0] . " date");
1210
1211    # 2. Update serviceid as we known LOCKID
1212    &set_serviceid_from($JOB->[8])
1213        or return &Error("JobManager line: Can't find $JOB->[8] DB serviceid");
1214
1215    # 3. Set countid
1216    &set_countid()
1217        or return &Error("JobManager line: Can't find DB countid");
1218
1219    if ($JOB->[4]) {
1220        # 4. Set afpjid as extrated from jid
1221        my ( $JobID ) = $JOB->[2] =~ /^(.*)-\d+$/ ;
1222        &set_afpjid_from( $JobID )
1223            or return &Error("JobManager line: Can't find DB afpjid for job");
1224
1225        # 5. Set destid
1226        &set_destid_from( $JOB->[3] )
1227            or return &Error("JobManager line: Can't find DB destid");
1228
1229        # 6. Set jid
1230        my ( $number ) = $JOB->[2] =~ /^.*-(\d+)$/ ;
1231        &set_jid_from( $number )
1232            or return &Error("JobManager line: Can't find DB jid");
1233
1234        # 7. Check if it has been counted
1235        my $select = $DBH->prepare_cached(qq{
1236            SELECT Counted FROM ${A2PDB_TABLE_PREFIX}jobs_$MONTH
1237            WHERE jid=?
1238            });
1239
1240        $select->execute($jid)
1241            or return &Error("JobManager line: " .
1242                "Can't check if counted in jobs table: " .
1243                $DBH->errstr());
1244
1245        my ( $counted ) = $select->fetchrow_array ;
1246        $select->finish ;
1247
1248        # 8. Still return if it still has been counted
1249        return $counted if ($counted & 2);
1250
1251        # 9. Update row in jobs table
1252        my $update = $DBH->prepare_cached(qq{
1253            UPDATE ${A2PDB_TABLE_PREFIX}jobs_$MONTH
1254            SET State=?, Errors=?, Timing=?, Chrono=?, Counted=Counted | 2
1255            WHERE jid=?
1256            });
1257
1258        my ( $chrono ) = $JOB->[7] =~ /^([0-9.]+)/ ;
1259        my ( $timing ) = $JOB->[9] =~ /^([0-9.]+)/ ;
1260        $update->execute($JOB->[4],$JOB->[6],$timing,$chrono,$jid)
1261            or return &Error("JobManager line: " .
1262                "Can't update row '$jid' in jobs_$MONTH table: " .
1263                $DBH->errstr());
1264
1265        # 10. At the same time, update jobs_err when some found
1266        if ($JOB->[6] =~ /^\d+$/ and $JOB->[6]) {
1267            $update = $DBH->prepare_cached(qq{
1268                UPDATE ${A2PDB_TABLE_PREFIX}Count
1269                SET jobs_err=jobs_err+$JOB->[6]
1270                WHERE countid=?
1271                });
1272
1273            $update->execute($countid)
1274                or return &Error("JobManager line: " .
1275                    "Can't update $countid row in Count table: " .
1276                    $DBH->errstr());
1277        }
1278
1279    } else {
1280        # 4. Set afpjid
1281        &set_afpjid_from($JOB->[2])
1282            or return &Error("JobManager line: Can't find DB afpjid of afpjob");
1283
1284        # 5. Check if it has been counted
1285        my $select = $DBH->prepare_cached(qq{
1286            SELECT Counted FROM ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH
1287            WHERE afpjid=?
1288            });
1289
1290        $select->execute($afpjid)
1291            or return &Error("JobManager line: " .
1292                "Can't check if counted in afpjobs table: " .
1293                $DBH->errstr());
1294
1295        my ( $counted ) = $select->fetchrow_array ;
1296        $select->finish ;
1297
1298        # 6. Still return if it still has been counted
1299        return $counted if ($counted & 2);
1300
1301        # 7. Update row in afpjobs table
1302        my $update = $DBH->prepare_cached(qq{
1303            UPDATE ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH
1304            SET State=?, Errors=?, Chrono=?, Counted=Counted | 2
1305            WHERE afpjid=?
1306            });
1307
1308        my ( $chrono ) = $JOB->[7] =~ /^([0-9.]+)/ ;
1309        $update->execute($JOB->[5],$JOB->[6],$chrono,$afpjid)
1310            or return &Error("JobManager line: " .
1311                "Can't update row '$afpjid' in afpjobs_$MONTH table: " .
1312                $DBH->errstr());
1313
1314        # 8. At the same time, update afpjobs_nb and afpjobs_err Count
1315        if ($JOB->[6] =~ /^\d+$/ and $JOB->[6]) {
1316            $update = $DBH->prepare_cached(qq{
1317                UPDATE ${A2PDB_TABLE_PREFIX}Count
1318                SET afpjobs_nb=afpjobs_nb+?, afpjobs_tps=afpjobs_tps+?,
1319                    afpjobs_err=afpjobs_err+$JOB->[6]
1320                WHERE countid=?
1321                });
1322
1323        } else {
1324            $update = $DBH->prepare_cached(qq{
1325                UPDATE ${A2PDB_TABLE_PREFIX}Count
1326                SET afpjobs_nb=afpjobs_nb+?, afpjobs_tps=afpjobs_tps+?
1327                WHERE countid=?
1328                });
1329        }
1330
1331        $update->execute($counted ? 0 : 1 , $chrono/1000, $countid)
1332            or return &Error("JobManager line: " .
1333                "Can't update $countid row in Count table: " .
1334                $DBH->errstr());
1335    }
1336
1337    # Finally. Commit changes and return commit success
1338    return &commit() ;
1339}
1340
1341
1342################################################################################
1343### JobStatus API
1344################################################################################
1345my $service_list = 0 ;
1346my $service_list_timer = [] ;
1347
1348sub CheckJobsStatusTables {
1349    &CheckTable('Services') or
1350        return &Error("No 'Services' table available for managing Jobs status");
1351
1352    return &CheckTable('jobs_status') ||
1353        &Error("No 'jobs_status' table available for Jobs status");
1354}
1355
1356sub CheckJobsQueueTable {
1357    return &CheckTable('jobs_queue') ||
1358        &Error("No 'jobs_queue' table available for Jobs management");
1359}
1360
1361sub get_serviceid_info {
1362    # API to retrieve the services sid of local services
1363    &UPSTAT('JOBSTATUS-DB-GET-SERVICES-TRY');
1364
1365    # Update service list ref only one time per minute
1366    if ($service_list and @{$service_list_timer} and not @_) {
1367        return $service_list
1368            unless (&tv_interval($service_list_timer) > 60);
1369
1370        &UPSTAT('JOBSTATUS-DB-GET-SERVICES-RECHECK');
1371    }
1372
1373    # Get serviceid from table
1374    my $Services = &get_table_hash('Services', [qw(Server Name LockID)] )
1375        or return &Error("Can't get Services hash");
1376
1377    # Set timer
1378    $service_list_timer = [ &gettimeofday() ] ;
1379
1380    # Get infos object if still not created
1381    $service_list = new A2P::Infos() unless ($service_list) ;
1382
1383    if ( defined($_[0]) and $_[0] =~ /check all/i ) {
1384        # Compute service id for each servers to cache them
1385        foreach my $server ( keys(%{$Services}) ) {
1386            my $hash = $Services->{$server} ;
1387            &Debug("Caching $server sid");
1388            $service_list = $service_list->get_serviceid_of( $hash );
1389        }
1390
1391        # Return A2P::Infos object
1392        return $service_list ;
1393
1394    } elsif ( ! exists($Services->{$Server}) or
1395    ref($Services->{$Server}) !~ /^HASH/ ) {
1396        # No entry in Services table will need to initialize it
1397        return &Debug("No entry in table for '$Server' (@_)");
1398    }
1399
1400    # Initialize/update A2P::Infos object with the ref for this server
1401    # It returns 0 or itself as A2P::Infos object
1402    return $service_list->get_serviceid_of( $Services->{$Server} );
1403}
1404
1405sub _ms {
1406    # Get current millisecond
1407    my @time = &gettimeofday() ;
1408    # If a parameter is defined, we want to add the seconds with it. It could be
1409    # a negative number to get a timer to compare
1410    $time[0] += shift if @_ ;
1411    return int( $time[0] * 1000 + $time[1]/1000 );
1412}
1413
1414sub job_status_update {
1415    my $sid    = shift || 0 ;
1416    my $lockid = shift ;
1417    my $job    = shift ;
1418
1419    $serviceid = $service_list ?
1420        $service_list->get_serviceid_of($lockid) : 0 ;
1421
1422    # If serviceid is not known we should try to re-initialize Infos object as
1423    # the serviceid could just has been inserted in table
1424    unless ($serviceid) {
1425        &get_serviceid_info('check now'); # Just any arg to avoid timer check
1426        $serviceid = $service_list->get_serviceid_of($lockid) || 0 ;
1427    }
1428
1429    # Finally the serviceid not exists in table we should create a new one
1430    unless ($serviceid) {
1431        # Use default statistics APIs
1432        my $sname = $service_list->get_servicename_of($lockid) || '' ;
1433        return &Error("Can't handle service $lockid without configuration")
1434            unless $sname ;
1435        &stats_set_service($sname);
1436        &set_serviceid_from($lockid);
1437        unless ($serviceid) {
1438            &get_serviceid_info('recheck now');
1439            $serviceid = $service_list->get_serviceid_of($lockid) || 0 ;
1440        }
1441        return &Error("Can't set new serviceid for $lockid") unless $serviceid ;
1442    }
1443
1444    # Prepare row to insert or update
1445    my @row = map { defined($_) ? $_ : "" }
1446        $serviceid, $job, @_[0..2], join("",@{$_[3]}), @_[4..10] ;
1447
1448    # Check if we can find the job in DB if it has no sid as it could has been
1449    # run in on another computer and/or service. Check also if we are rechecking
1450    # status after an a2p-status service restart
1451    unless ( $sid =~ /^\d+$/ and $sid ) {
1452        my $select = $DBH->prepare_cached(qq{
1453            SELECT sid FROM ${A2PDB_TABLE_PREFIX}jobs_status
1454            WHERE Jobname=? OR ( serviceid=? and JobID=? )
1455            });
1456
1457        &UPSTAT('JOBSTATUS-DB-CHECKSID');
1458
1459        &Debug("Check if job $row[8] is still in DB");
1460        $select->execute( $row[8], $serviceid, $job )
1461            or return &Error("Can't check SID in jobs_status table: ".
1462                $DBH->errstr());
1463
1464        my ( $thissid ) = $select->fetchrow_array ;
1465        $select->finish ;
1466        $sid = $thissid
1467            if (defined($thissid) and $thissid and $thissid =~ /^\d+$/);
1468    }
1469
1470    if ($sid) {
1471        &TIMESTAT('JOBSTATUS-DB-UPDATE');
1472        # Prepare to update a row if revision is not up to date
1473        my $update = $DBH->prepare_cached(qq{
1474            UPDATE ${A2PDB_TABLE_PREFIX}jobs_status
1475            SET serviceid=?, JobID=?, Revision=?, Timer=?, Step=?, Steps=?,
1476                Born=?, Date=?, JobName=?, CurrentStatus=?, NbJobs=?, DestIDs=?,
1477                Infos=?
1478            WHERE sid = ? AND (Revision < ? OR Born < ?)
1479            });
1480
1481        &Debug("Update row $sid for $job rev$row[2] with '@row'");
1482        &UPSTAT('JOBSTATUS-DB-UPDATE');
1483
1484        $update->execute(@row, $sid, $row[2], $row[6])
1485            or $sid = &Error("Can't update $job db jobstatus: ".$DBH->errstr());
1486
1487        if ($sid and $DBH->commit()) {
1488            &UPSTAT('JOBSTATUS-DB-UPDATED');
1489
1490        } else {
1491            $sid = 0 ;
1492            &Debug("Bad commit on $job Job Status update");
1493        }
1494
1495        &TIMESTAT('JOBSTATUS-DB-UPDATE');
1496
1497    } else {
1498        # Insert a new row
1499
1500        # Do a table optimization and purge at last every 60 minutes
1501        &Warn("jobs_status DB table not optimized and purged")
1502            unless (&job_status_purge());
1503
1504        &TIMESTAT('JOBSTATUS-DB-INSERT');
1505        &Debug("Insert '@row' into jobs_status table");
1506        # Insert new jobstatus in table
1507        my $insert = $DBH->prepare_cached(qq{
1508            INSERT INTO ${A2PDB_TABLE_PREFIX}jobs_status
1509                (serviceid,JobID,Revision,Timer,Step,Steps,
1510                Born,Date,JobName,CurrentStatus,NbJobs,DestIDs,Infos)
1511                VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)
1512            });
1513
1514        &UPSTAT('JOBSTATUS-DB-INSERT');
1515        my $executed = $insert->execute(@row)
1516            || &Error("Can't insert '$row[1]' JobID in job_status table: ".
1517               $DBH->errstr());
1518
1519        # Return the sid inserted
1520        if ($executed and $DBH->commit()) {
1521            $sid = &mysql_insert_id() ;
1522            &UPSTAT('JOBSTATUS-DB-INSERTED');
1523            &MAXMINSTAT('JOBSTATUS-BORN-TO-DB-TIMING-ms', &_ms() - $row[6]);
1524            &Debug("Last insertid SID is $sid");
1525
1526        } elsif ($executed) {
1527            &Error("No sid retrieved for '$row[1]' Job Status insertion");
1528        }
1529
1530        &TIMESTAT('JOBSTATUS-DB-INSERT');
1531    }
1532
1533    return $sid ;
1534}
1535
1536my %optimize = () ; # Time out list
1537sub optimize_table {
1538    return unless ($DO_DB_OPTIMIZATION);
1539    my $table = shift or return "" ;
1540
1541    # Do a table optimization if 1 hour has pasted since last optimization
1542    return 0 if ( exists($optimize{$table}) and time < $optimize{$table} );
1543
1544    # Protect table name
1545    return 0 unless ( $table =~ /^\w+$/ );
1546
1547    # Set time out to next hour
1548    $optimize{$table} = time + 3600 ;
1549
1550    &TIMESTAT(uc($table).'-TABLE-OPTIMIZATION');
1551    # Do table optimization and disable it if it fails
1552    unless ($DBH->do("OPTIMIZE TABLE ${A2PDB_TABLE_PREFIX}$table")) {
1553        &Info("Can't optimize $table DB table");
1554        # Don't try to optimize until next service restart
1555        $optimize{$table} = [] ;
1556    }
1557    &TIMESTAT(uc($table).'-TABLE-OPTIMIZATION');
1558    &UPSTAT(uc($table).'-TABLE-OPTIMIZED');
1559
1560    return $optimize{$table} ;
1561}
1562
1563sub job_status_delete {
1564    # A valid SID must be provided
1565    my $sid = shift || return 0 ;
1566    return 0 unless ($sid =~ /^\d+$/);
1567
1568    $DBH->do(qq{
1569        DELETE FROM ${A2PDB_TABLE_PREFIX}jobs_status
1570        WHERE sid=$sid
1571        })
1572        or return &Error("Can't delete row $sid in jobs_status DB table");
1573
1574    $DBH->commit()
1575        or &Error("Can't commit row $sid deletion in jobs_status DB table");
1576    &UPSTAT('JOBSTATUS-DB-DELETE');
1577    &Debug("Deleted status with sid $sid in DB");
1578
1579    # Do a table optimization and purge at last every 60 minutes
1580    &Warn("jobs_status DB table not optimized and purged")
1581        unless (&job_status_purge());
1582
1583    return 1 ;
1584}
1585
1586my $purge_timout = 0 ;
1587sub job_status_purge {
1588    # Prevent purge too soon by calling optimize_table just before which will
1589    # will return true only every half an hour
1590    return 1 unless ( $STATUS_DB_MAXAGE and time > $purge_timout );
1591
1592    # Reset time out
1593    $purge_timout = time + 1800 ;
1594
1595    # Delete max_aged rows, Timer is ms avec parameter is hour
1596    my $time    = time * 1000 ;
1597    my $timeout = $time - 3600000 * $STATUS_DB_MAXAGE ;
1598
1599    unless ($NO_SYSLOG_DEBUG) {
1600        # Log which row will be deleted when debug mode is activated
1601        &Debug("Try deleting job status row at time $time ms since EPOCH...");
1602        &Debug("Deleting any job status DB row with Timer lower than $timeout");
1603        my $select = $DBH->prepare_cached(qq{
1604            SELECT s.LockID,j.JobID,j.Revision,j.Timer,j.Step,j.Steps,
1605                   j.Born,j.JobName,j.CurrentStatus,j.NbJobs,j.Infos,j.dbtime
1606            FROM ${A2PDB_TABLE_PREFIX}Services s,
1607                 ${A2PDB_TABLE_PREFIX}jobs_status j
1608            WHERE j.Timer<? AND j.CurrentStatus LIKE 'DONE'
1609                AND s.serviceid=j.serviceid
1610        });
1611
1612        if ($select->execute($timeout)) {
1613            while ( my @row = $select->fetchrow_array ) {
1614                &Debug("Deleting status row: @row");
1615            }
1616        }
1617
1618        $select->finish ;
1619    }
1620
1621    # DO DELETE
1622    $DBH->do(qq{
1623        DELETE FROM ${A2PDB_TABLE_PREFIX}jobs_status
1624        WHERE CurrentStatus LIKE 'DONE' AND Timer<$timeout
1625    })
1626        or return &Error("Can't do aged deletion in jobs_status table");
1627
1628    $DBH->commit()
1629        or return &Error("Can't commit aged deletion in jobs_status table");
1630
1631    # Finaly optimize the table
1632    &optimize_table('jobs_status');
1633
1634    return 1 ;
1635}
1636
1637sub is_job_status_updated {
1638    my $sid = shift or return 0 ;
1639    my $rev = shift || 0 ;
1640
1641    return 0 unless (&a2p_db_connected);
1642
1643    my $select = $DBH->prepare_cached(qq{
1644          SELECT s.LockID,j.JobID,j.Revision,j.Timer,j.Step,j.Steps,
1645                 j.Born,j.Date,j.JobName,j.CurrentStatus,j.NbJobs
1646          FROM ${A2PDB_TABLE_PREFIX}Services s, ${A2PDB_TABLE_PREFIX}jobs_status j
1647        WHERE j.sid=? and s.serviceid=j.serviceid
1648        });
1649
1650    $select->execute($sid)
1651        or return &Error("Can't check $sid SID update in jobs_status table: ".
1652            $DBH->errstr());
1653
1654    my @row = $select->fetchrow_array ;
1655    $select->finish ;
1656
1657    &UPSTAT('JOBSTATUS-DB-IS-UPDATED?');
1658
1659    # Check if revision has been updated
1660    if ( $row[2] > $rev ) {
1661        &Debug("Found new revision $row[2] for $row[1] (sid=$sid) in DB");
1662        return @row ;
1663    } else {
1664        return () ;
1665    }
1666}
1667
1668# List of accepted filter and the corresponding SQL statement to add computed
1669# in a sprintf call with the transmitted value
1670my %sql_filters = (
1671    LOCKID      =>  "s.LockID LIKE '%s'",
1672    NAME        =>  "( j.JobID LIKE '*%s*' OR j.JobName LIKE '*%s*' )",
1673    DAY         =>  "j.Date LIKE '%s'",
1674    STATUS      =>  {
1675        'any'            => [ "j.CurrentStatus LIKE 'AB%'",
1676                              "j.CurrentStatus LIKE 'KO'",
1677                              "NOT ( j.CurrentStatus LIKE 'DO%'
1678                              OR j.CurrentStatus LIKE 'KO'
1679                              OR j.CurrentStatus LIKE 'AB%' )",
1680                              "j.CurrentStatus LIKE 'DO%'" ],
1681        'done'           => [ "j.CurrentStatus LIKE 'DO%'" ],
1682        'ko|abterm'      => [ "j.CurrentStatus LIKE 'AB%'",
1683                              "j.CurrentStatus LIKE 'KO'" ],
1684        'done|ko|abterm' => [ "j.CurrentStatus LIKE 'AB%'",
1685                              "j.CurrentStatus LIKE 'KO'",
1686                              "j.CurrentStatus LIKE 'DO%'" ],
1687        },
1688    NOTSTATUS   =>  {
1689        'done'           => [
1690            "j.CurrentStatus LIKE 'AB%'",
1691            "j.CurrentStatus LIKE 'KO'",
1692            "NOT ( j.CurrentStatus LIKE 'DO%' OR j.CurrentStatus LIKE 'KO'
1693            OR j.CurrentStatus LIKE 'AB%' )" ],
1694        'done|ko|abterm' => [ "NOT ( j.CurrentStatus LIKE 'DO%'
1695            OR j.CurrentStatus LIKE 'KO' OR j.CurrentStatus LIKE 'AB%' )" ],
1696        'ko|abterm'      => [ "NOT ( j.CurrentStatus LIKE 'DO%'
1697            OR j.CurrentStatus LIKE 'KO' OR j.CurrentStatus LIKE 'AB%' )",
1698            "j.CurrentStatus LIKE 'DO%'" ]
1699        },
1700    DESTID      =>  "j.DestIDs LIKE '*%s*'"
1701);
1702
1703my $get_next_select ;
1704my $get_next_count ;
1705my @statements = () ;
1706sub get_next_jobstatus {
1707
1708    &a2p_db_connect unless $DBH ;
1709
1710    unless (defined($get_next_select) and $get_next_select) {
1711        my $sid = shift || 0 ;
1712        $sid = 0 unless ( $sid =~ /^\d+$/ );
1713        my $sql = '' ;
1714
1715        # Prepare SQL template
1716        my $template = qq{
1717              SELECT j.sid as SID, s.LockID as LOCKID, j.JobID as JOBID,
1718                     j.Revision as REV, j.Timer as TIMER, j.Step as STEP ,
1719                     j.Steps as STATE, j.Born as BORN, j.Date as DAY,
1720                     j.JobName as AFP, j.CurrentStatus as STATUS,
1721                     j.NbJobs as NBJOBS, j.DestIDs as DESTIDS, j.Infos as INFOS,
1722                     j.dbtime as DBTIME
1723              FROM
1724                   ${A2PDB_TABLE_PREFIX}Services s,
1725                   ${A2PDB_TABLE_PREFIX}jobs_status j
1726            WHERE j.sid>$sid AND s.serviceid=j.serviceid
1727            %s
1728            ORDER BY SID DESC
1729            %s
1730            } ;
1731
1732        $get_next_count = 0 ;
1733
1734        # Compute a SQL statement to add as WHERE to get faster requests
1735        my $filter = "" ;
1736        my $limit  = "" ;
1737
1738        # Filters must be provided in a hash as column/like value pairs
1739        if ( @_ and defined($_[0]) and $_[0] ) {
1740            my $hash = shift ;
1741
1742            # Check if we are limited in row number to retrieve
1743            if ( exists($hash->{LIMIT}) ) {
1744                $limit = " LIMIT 0," . $hash->{LIMIT}
1745                    if ( $hash->{LIMIT} =~ /^\d+$/ );
1746                delete $hash->{LIMIT} ;
1747            }
1748
1749            # Check status filter
1750            my @Status = () ;
1751            my ( $status_key ) = grep { /STATUS/i } keys(%{$hash}) ;
1752            if ( $status_key ) {
1753                @Status = @{$sql_filters{$status_key}->{$hash->{$status_key}}} ;
1754                delete $hash->{$status_key} ;
1755
1756            } else {
1757                @Status = @{$sql_filters{'STATUS'}->{'any'}} ;
1758            }
1759
1760            # Check other keys to prepare filter
1761            foreach my $key ( keys(%{$hash}) ) {
1762                next unless (exists($sql_filters{$key}));
1763                my $value = $hash->{$key} ;
1764                if (ref($sql_filters{$key}) =~ /^HASH/i) {
1765                    $filter .= " AND " . $sql_filters{$key}->{$value} ;
1766
1767                } else {
1768                    # Strip illegal chars
1769                    $value =~ s/[^%*0-9A-Za-z_.-]//g ;
1770                    $filter .= " AND " . sprintf($sql_filters{$key},$value) ;
1771                }
1772            }
1773
1774            # Prepare definitive SQL
1775            @statements = map {
1776                sprintf( $template, ' AND ' . $_ . $filter, $limit )
1777            } @Status ;
1778            $sql = shift @statements ;
1779
1780        } else {
1781            # Set default statement
1782            $sql = sprintf( $template, '' );
1783            @statements = () ;
1784        }
1785
1786        $get_next_select = $DBH->prepare_cached($sql);
1787        $get_next_select->execute()
1788            or return $get_next_select = &Error("Can't get jobs status in " .
1789                "jobs_status table: " . $DBH->errstr());
1790    }
1791
1792    my $row = $get_next_select->fetchrow_hashref ;
1793
1794    # Try next statements in list when available
1795    while ( !(defined($row) or ref($row) =~ /^HASH/) and @statements ) {
1796        if ($DBH->errstr) {
1797            &Error("Problem when retrieving jobs status in jobs_status table: ".
1798                $DBH->errstr());
1799        }
1800        $get_next_select->finish ;
1801        my $sql = shift @statements ;
1802        if ( defined($sql) and $sql ) {
1803            $get_next_select = $DBH->prepare_cached($sql);
1804            $get_next_select->execute()
1805                or return $get_next_select = &Error("Can't get jobs status " .
1806                    "with next statement: " . $DBH->errstr());
1807            $row = $get_next_select->fetchrow_hashref ;
1808        }
1809    }
1810
1811    unless (defined($row) and ref($row) =~ /^HASH/) {
1812        &MAXSTAT('JOBSTATUS-DB-GET',$get_next_count);
1813        if ($DBH->errstr) {
1814            &Error("Problem when retrieving jobs status in jobs_status table: ".
1815                $DBH->errstr());
1816        }
1817        $get_next_select->finish ;
1818        $get_next_select = undef ;
1819        &a2p_db_disconnect ;
1820    }
1821
1822    return $row ;
1823}
1824
1825sub job_queue_update {
1826    my $jid  = shift || 0 ;
1827    my $mode = shift || 0 ;
1828    my $serv = shift || 0 ;
1829    my $name = shift || "";
1830    my $afp  = shift || "" ;
1831    my $rev  = shift || 0 ; # set it to negative value to delete the jid ref row
1832    my $data = shift ;
1833
1834    &a2p_db_connect unless $DBH ;
1835
1836    if (!defined($data)) {
1837        # This is a synchronization check or a delete request
1838        if ( $jid and $rev >= 0 ) {
1839            &TIMESTAT('A2PJOB-DB-SYNC');
1840
1841            # Prepare to update a row if revision is not up to date
1842            my $synchro = $DBH->prepare_cached(qq{
1843                SELECT Revision, Object
1844                FROM ${A2PDB_TABLE_PREFIX}jobs_queue
1845                WHERE jid = ?
1846                });
1847
1848            &Debug("Syncing $name rev$rev in jobs_queue table");
1849            &UPSTAT('A2PJOB-DB-SYNC');
1850
1851            $synchro->execute($jid)
1852                or $jid = &Error("Can't update $name in db: ".$DBH->errstr());
1853            my @synchro = $synchro->fetchrow_array ;
1854            $synchro->finish ;
1855
1856            &TIMESTAT('A2PJOB-DB-UPDATE');
1857            return $synchro[0] > $rev ? $synchro[1] : undef ;
1858
1859        } elsif ( $jid and $jid =~ /^\d+$/ and $rev < 0 ) {
1860            &TIMESTAT('A2PJOB-DB-DELETE');
1861            &Debug("Deleting row $jid of $name in jobs_queue table");
1862
1863            # Request deletion in DB
1864            my $result = $DBH->do(qq{
1865                DELETE FROM ${A2PDB_TABLE_PREFIX}jobs_queue
1866                WHERE jid=$jid
1867                })
1868                || &Error("Can't delete row $jid in jobs_queue DB table");
1869
1870            $result = $DBH->commit()
1871                || &Error("Can't commit row $jid deletion in jobs_queue table")
1872                if ($result);
1873
1874            &TIMESTAT('A2PJOB-DB-DELETE');
1875            return undef unless $result ;
1876            &UPSTAT('A2PJOB-DB-DELETE');
1877
1878            # Do a table optimization at some time
1879            &optimize_table('jobs_queue');
1880
1881            return $result ;
1882
1883        } elsif ($jid) {
1884            &Error("Invalid '$jid' jid detected");
1885            return undef ;
1886
1887        } else {
1888            &Error("Can't synchronize '$name' without jid set");
1889            return undef ;
1890        }
1891    }
1892
1893    # Check if jobname is in DB and then get the jid
1894    if (!$jid and ($name or $afp)) {
1895        &TIMESTAT('A2PJOB-DB-FIND-JID');
1896        # Prepare to update a row if revision is not up to date
1897        my $select ;
1898        if ($name) {
1899            $select = $DBH->prepare_cached(qq{
1900                SELECT jid FROM ${A2PDB_TABLE_PREFIX}jobs_queue
1901                WHERE JobID=?
1902                });
1903        } else {
1904            $select = $DBH->prepare_cached(qq{
1905                SELECT jid FROM ${A2PDB_TABLE_PREFIX}jobs_queue
1906                WHERE Jobname=?
1907                });
1908        }
1909
1910        &UPSTAT('A2PJOB-DB-CHECKJID');
1911
1912        &Debug("Check if job $name or $afp is in DB");
1913        $select->execute( $name ? $name : $afp )
1914            or return &Error("Can't check JID in jobs_queue table: ".
1915                $DBH->errstr());
1916
1917        my ( $thisjid ) = $select->fetchrow_array ;
1918        $select->finish ;
1919        $jid = $thisjid
1920            if (defined($thisjid) and $thisjid);
1921        &TIMESTAT('A2PJOB-DB-FIND-JID');
1922    }
1923
1924    if ($jid) {
1925        &TIMESTAT('A2PJOB-DB-UPDATE');
1926        # Prepare to update a row if revision is not up to date
1927        my $update = $DBH->prepare_cached(qq{
1928            UPDATE ${A2PDB_TABLE_PREFIX}jobs_queue
1929            SET Mode=?, serviceid=?, JobID=?, JobName=?, Revision=?, Object=?
1930            WHERE jid=?
1931            });
1932
1933        &Debug("Updating $name rev$rev in jobs_queue table");
1934        &UPSTAT('A2PJOB-DB-UPDATE');
1935
1936        $update->execute($mode, $serv, $name, $afp, $rev, $data, $jid)
1937            or $jid = &Error("Can't update $name in db: ".$DBH->errstr());
1938
1939        if ($jid and $DBH->commit()) {
1940            &UPSTAT('A2PJOB-DB-UPDATED');
1941
1942        } else {
1943            $jid = 0 ;
1944            &Warn("Bad commit on $name AfpJob update");
1945        }
1946
1947        &TIMESTAT('A2PJOB-DB-UPDATE');
1948
1949    } elsif ($name) {
1950        &TIMESTAT('A2PJOB-DB-INSERT');
1951        &Debug("Inserting '$name', '$afp' into jobs_queue table");
1952        # Insert new jobstatus in table
1953        my $insert = $DBH->prepare_cached(qq{
1954            INSERT INTO ${A2PDB_TABLE_PREFIX}jobs_queue
1955                (Mode,serviceid,JobID,JobName,Revision,Object)
1956                VALUES(?,?,?,?,?,?)
1957            });
1958
1959        &UPSTAT('A2PJOB-DB-INSERT');
1960        my $executed = $insert->execute($mode,$serv,$name, $afp, $rev, $data)
1961            || &Error("Can't insert '$name' AfpJob in jobs_queue table: ".
1962               $DBH->errstr());
1963
1964        # Return the sid inserted
1965        if ($executed and $DBH->commit()) {
1966            $jid = &mysql_insert_id() ;
1967            &UPSTAT('A2PJOB-DB-INSERTED');
1968
1969        } elsif ($executed) {
1970            &Error("No jid retrieved for '$name' AfpJob insertion");
1971        }
1972
1973        &TIMESTAT('A2PJOB-DB-INSERT');
1974    }
1975
1976    return $jid ;
1977}
1978
1979my $get_next_afpjob_select ;
1980my $get_next_afpjob_count ;
1981sub get_next_afpjob {
1982    my $mode = shift || 0 ;
1983    my $serviceid = shift || 0 ;
1984
1985    return 0 unless (&a2p_db_connected);
1986
1987    unless (defined($get_next_afpjob_select) and $get_next_afpjob_select) {
1988        $get_next_afpjob_count = 0 ;
1989
1990        # Necessary to make the next SELECT working
1991        $DBH->commit();
1992
1993        if ($serviceid) {
1994            &UPSTAT('A2PJOB-DB-CHECK-SERVICEID-'.$serviceid);
1995            $get_next_afpjob_select = $DBH->prepare_cached( qq{
1996                  SELECT j.jid as JID, j.Mode as MODE, j.JobID as JOBID,
1997                         j.JobName as JOBNAME, j.Revision as REV,
1998                         j.Object as THAWN, j.dbtime as DBTIME
1999                  FROM
2000                       ${A2PDB_TABLE_PREFIX}jobs_queue j
2001                WHERE Mode=? AND serviceid=?
2002                ORDER BY JID DESC
2003                } );
2004            $get_next_afpjob_select->execute($mode,$serviceid)
2005                or return $get_next_afpjob_select = &Error(
2006                "Can't get rows in jobs_queue table for serviceid $serviceid:" .
2007                $DBH->errstr());
2008
2009        } else {
2010            &UPSTAT('A2PJOB-DB-CHECK-SERVICEID-UNDEF');
2011            $get_next_afpjob_select = $DBH->prepare_cached( qq{
2012                  SELECT j.jid as JID, j.Mode as MODE, j.JobID as JOBID,
2013                         j.JobName as JOBNAME, j.Revision as REV,
2014                         j.Object as THAWN, j.dbtime as DBTIME
2015                  FROM
2016                       ${A2PDB_TABLE_PREFIX}jobs_queue j
2017                WHERE Mode=?
2018                ORDER BY JID DESC
2019                } );
2020            $get_next_afpjob_select->execute($mode)
2021                or return $get_next_afpjob_select = &Error(
2022                "Can't get rows in jobs_queue table in mode $mode: " .
2023                $DBH->errstr());
2024        }
2025    }
2026
2027    my $row = $get_next_afpjob_select->fetchrow_hashref ;
2028
2029    unless (defined($row) and ref($row) =~ /^HASH/) {
2030        &UPSTAT('A2PJOB-DB-ALL-GOT');
2031        &MAXSTAT('A2PJOB-DB-GET',$get_next_afpjob_count);
2032        if ($DBH->errstr) {
2033            &Error("Error when retrieving afpjob objects in jobs_queue table: ".
2034                $DBH->errstr());
2035        }
2036        $get_next_afpjob_select->finish ;
2037        undef $get_next_afpjob_select ;
2038
2039        # Avoid disconnection in a2p-status
2040        &a2p_db_disconnect unless ($serviceid);
2041
2042        $row = 0 ;
2043    }
2044
2045    return $row ;
2046}
2047
2048sub a2p_db_avoid_disconnect_in_forked {
2049    # This sub must be called in forked processes to avoid closing parent
2050    # connections when forked process is exiting
2051    $DBH->{'InactiveDestroy'} = 1 if (defined($DBH) and $DBH);
2052}
2053
2054################################################################################
2055### Module End
2056################################################################################
2057
2058&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
2059
20601;
Note: See TracBrowser for help on using the repository browser.