# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: DB.pm 3 2007-10-18 16:20:19Z guillaume $ # package A2P::DB ; use strict ; use Time::HiRes qw( gettimeofday tv_interval usleep ) ; use DBI ; use A2P::Globals ; use A2P::Syslog ; use A2P::Infos ; BEGIN { use Exporter (); our ( $VERSION, @ISA, @EXPORT, @EXPORT_OK ); $VERSION = sprintf "%s", q$Rev: 1426 $ =~ /(\d[0-9.]+)\s+/ ; @ISA = qw(Exporter); # Export common API @EXPORT = qw( &a2p_db_connect &a2p_db_disconnect &a2p_db_connected &check_commit &a2p_db_avoid_disconnect_in_forked ); # Export OK jobstatus API push @EXPORT_OK, qw( &CheckJobsStatusTables &get_serviceid_info &job_status_update &is_job_status_updated &get_next_jobstatus &job_status_delete &CheckJobsQueueTable &job_queue_update &get_next_afpjob ); # Export OK statistics API push @EXPORT_OK, qw( &a2p_db_statistics_ready &CheckJobsTables &stats_set_service &stats_jobmanager &stats_converter ); } our $VERSION ; my $DBH ; my %TABLES ; my $CACHE ; my $JOB ; # Array ref set in &stats_converter or &stats_jobmanager my $MONTH ; # String set in &CheckJobsTables my %CREATE_TABLE = ( afpjobs_YYYYMM => q{ CREATE TABLE afpjobs_YYYYMM ( afpjid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, JobID TEXT NOT NULL, serviceid INTEGER UNSIGNED NOT NULL, Date VARCHAR(8) NOT NULL, DayTime TIME NOT NULL, State VARCHAR(2) NOT NULL, Errors SMALLINT UNSIGNED NOT NULL, NBJobs SMALLINT UNSIGNED NOT NULL, Chrono FLOAT NOT NULL, Counted TINYINT UNSIGNED NOT NULL DEFAULT 0, PRIMARY KEY(afpjid), INDEX jobids_index(JobID(16),serviceid) ) TYPE=InnoDB; }, jobs_YYYYMM => q{ CREATE TABLE jobs_YYYYMM ( jid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, afpjid INTEGER UNSIGNED NOT NULL, Number SMALLINT UNSIGNED NOT NULL, destid INTEGER UNSIGNED NOT NULL, docid INTEGER UNSIGNED NOT NULL, NbPages SMALLINT UNSIGNED NOT NULL, State VARCHAR(2) NULL, Timing FLOAT NOT NULL, Chrono FLOAT NOT NULL, Kind VARCHAR(1) NULL, Errors TINYINT UNSIGNED NOT NULL, Counted TINYINT UNSIGNED NOT NULL DEFAULT 0, Jobname TEXT NULL, PRIMARY KEY(jid), UNIQUE INDEX unique_jids_index(afpjid,Number) ) TYPE=InnoDB; }, jobs_status => q{ CREATE TABLE jobs_status ( sid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, serviceid INTEGER UNSIGNED NULL, JobID TEXT NULL, Revision INTEGER UNSIGNED NULL, Timer BIGINT UNSIGNED NULL, Step TINYINT UNSIGNED NULL, Steps VARCHAR(13) NULL, Born BIGINT UNSIGNED NULL, Date VARCHAR(8) NULL, Jobname TEXT NULL, CurrentStatus TEXT NULL, NbJobs INTEGER UNSIGNED NULL, DestIDs TEXT NULL, Infos TEXT NULL, dbtime TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY(sid), INDEX jobs_status_index(serviceid, JobID(16)), INDEX jobs_status_jobname_index(Jobname(16)) ) TYPE=InnoDB; }, jobs_queue => q{ # jobs_queue CREATE TABLE jobs_queue ( jid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, Mode TINYINT # Mode = 0 -> User mode ; Mode > 0 -> Advanced mode UNSIGNED NULL, serviceid INTEGER UNSIGNED NULL, JobID TEXT NULL, Jobname TEXT NULL, Revision INTEGER UNSIGNED NULL, Object TEXT NULL, dbtime TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY(jid), INDEX mode_index(Mode,serviceid) ) TYPE=InnoDB; }, Services => q{ CREATE TABLE Services ( serviceid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, Name TEXT NOT NULL, Server TEXT NOT NULL, LockID TEXT NOT NULL, PRIMARY KEY(serviceid), UNIQUE INDEX unique_services_index(Name(20),Server(30),LockID(10)) ) TYPE=InnoDB; }, Count => q{ CREATE TABLE Count ( countid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, serviceid INTEGER UNSIGNED NOT NULL, YearMonth VARCHAR(6) NOT NULL, afpjobs_nb INTEGER UNSIGNED NOT NULL, afpjobs_tps FLOAT NOT NULL, afpjobs_err INTEGER UNSIGNED NOT NULL, jobs_nb INTEGER UNSIGNED NOT NULL, jobs_err INTEGER UNSIGNED NOT NULL, pages_nb INTEGER UNSIGNED NOT NULL, PRIMARY KEY(countid), UNIQUE INDEX unique_counts_index(serviceid,YearMonth) ) TYPE=InnoDB; }, Destinations => q{ CREATE TABLE Destinations ( destid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, DestIdName TEXT NOT NULL, FirstTime DATE NOT NULL, LastTime DATE NOT NULL, Count INTEGER UNSIGNED NOT NULL, PRIMARY KEY(destid), UNIQUE INDEX unique_destids_index(DestIdName(8)) ) TYPE=InnoDB; }, Documents => q{ CREATE TABLE Documents ( docid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, PageDef TEXT NOT NULL, FormDef TEXT NOT NULL, DocName TEXT NOT NULL, PageFormats TEXT NOT NULL, CopyGroups TEXT NOT NULL, Logos TEXT NOT NULL, FirstTime DATE NOT NULL, LastTime DATE NOT NULL, Count INTEGER UNSIGNED NOT NULL, PRIMARY KEY(docid), INDEX documents_index(PageDef(6),FormDef(6),DocName(4)) ) TYPE=InnoDB; }, DocDestCount => q{ CREATE TABLE DocDestCount ( ddid INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, destid INTEGER UNSIGNED NOT NULL, docid INTEGER UNSIGNED NOT NULL, FirstTime DATE NOT NULL, LastTime DATE NOT NULL, Count INTEGER UNSIGNED NOT NULL, PRIMARY KEY(ddid), UNIQUE INDEX ddids_index(destid,docid) ) TYPE=InnoDB; } ); my $Server ; if ( -e '/proc/sys/kernel/hostname' ) { open *HOSTNAME , '<', "/proc/sys/kernel/hostname" or die "Can't read hostname from kernel: $!"; $Server = ; close HOSTNAME ; } else { $Server = qx/hostname -s/ ; } $Server = 'unknown' unless (defined($Server) and $Server); chomp $Server ; # String set in &stats_set_service, used in &set_serviceid_from my $service = "" ; my $serviceid = 0 ; # Value set in &set_serviceid_from my $countid = 0 ; # Value set in &set_countid my $afpjid = 0 ; # Value set in &set_afpjid_from my $jid = 0 ; # Value set in &set_jid_from my $destid = 0 ; # Value set in &set_destid_from my $docid = 0 ; # Value set in &set_docid my $ddid = 0 ; # Value set in &set_ddid my $actual_commit = 0 ; # Commit counter my $commit_timer = [ &gettimeofday() ] ; # Commit timer my @commit_timers = () ; # Commit times to check timer adjustement my $max_timer = 10 ; # 10 seconds is the max timer, will be updated # as needed but not other 10s # Set to something returning 1 only for debugging during development sub __A2P_DB_DEBUG__ { 0 } ################################################################################ ### Common API ################################################################################ my $connected_timer = [ &gettimeofday() ] ; sub a2p_db_connect { if ($RECONNECT_TIMER) { # Force a disconnection from DB if (&tv_interval($connected_timer) >= $RECONNECT_TIMER) { &UPSTAT('A2P-DB-RECONNECT-TIMEOUT'); $connected_timer = [ &gettimeofday() ] ; &a2p_db_disconnect(); } } # Get a connection to DB $DBH = DBI->connect_cached( $DBI_DSN, $DBI_USER ? $DBI_USER : $Progname , $DBI_PASSWD, { PrintError => 1, RaiseError => 0, AutoCommit => 0, __statistics__ => __FILE__.__LINE__, Profile => 0 # Can be 2, 4 or 6 to profile SQL requests (see man DBI::Profile) }) or return &Error("Can't connect to '$DBI_DSN': " . $DBI::errstr); &UPSTAT('A2P-DB-CONNECT-CACHED'); return defined($DBH) ? $DBH : 0 ; } sub a2p_db_connected { # Check we are still connected to DB my $connected = (defined($DBH) and $DBH) ? 1 : 0 ; &UPSTAT('A2P-DB-CONNECTED-CHECK-'.$connected); # Force a reconnection to DB if RECONNECT_TIMER is set and delay is passed $connected = &a2p_db_connect() if ($RECONNECT_TIMER and &tv_interval($connected_timer) >= $RECONNECT_TIMER); # Disconnect on error detection if ( $connected and defined($DBH->err()) and $DBH->err() ) { &Error("Disconnecting on DBI error detection: " . $DBH->errstr()); $connected = 0 ; } # Check the ping status a try reconnect is False unless ( $connected and $DBH->ping ) { &Info("DB Disconnection detected, reconnecting...") if $connected ; &a2p_db_disconnect(); $connected = &a2p_db_connect(); } return $connected ; } sub a2p_db_disconnect { my @caller = caller ; local $" = '-' ; $caller[1] =~ s/.*\/// ; &UPSTAT("A2P-DB-DISCONNECT-@caller"); if (defined($DBH)) { # Check if profile was enabled and then dump as Debug if (defined($DBH->{Profile}) and $DBH->{Profile}) { &Debug("DB profiling: " . $DBH->{Profile}->format); } if ($actual_commit) { # Force commit immediatly before disconnection $actual_commit = $STATS_COMMIT_SKIP ; &commit(); usleep 10000 ; } # Disconnect DB if present $DBH->disconnect(); &UPSTAT('A2P-DB-DISCONNECTED'); # Erase DBH handler undef $DBH ; # Erase Some related variables map { delete $TABLES{$_} } keys(%TABLES) ; } } ################################################################################ ### Private API ################################################################################ sub CheckTable { my $name = shift ; my $sub = shift ; return &Error("Can't check not specified table") unless (defined($name) and $name); return &Error("Not connected to DB") unless (&a2p_db_connected); return $TABLES{$name} if (defined($TABLES{$name}) and $TABLES{$name}); #&Debug("Checking table '$name' exitence"); if (!defined($TABLES{$name}) and $DBH->do("SHOW TABLES LIKE '$A2PDB_TABLE_PREFIX$name'")>0) { $TABLES{$name} = 1 ; } else { &Debug("Table '$name' seems not exists: ".$DBH->errstr()); $TABLES{$name} = 0 ; # Only one recursive call is possible unless (@_) { my $statement ; my $table_name = defined($sub) ? &$sub($name) : $name ; if (defined($CREATE_TABLE{$table_name})) { $statement = $CREATE_TABLE{$table_name} ; } else { return &Error("Table '$table_name' creation not supported"); } # Update to create table if not exists $statement =~ s/CREATE\s+TABLE\s+$table_name/CREATE TABLE IF NOT EXISTS $A2PDB_TABLE_PREFIX$name/mi ; # Also update indexes names when a prefix is set $statement =~ s/^(\s*)INDEX\s+/INDEX $A2PDB_TABLE_PREFIX/mig if ($A2PDB_TABLE_PREFIX); &Debug("Try to create table with $statement") if __A2P_DB_DEBUG__ ; $DBH->do($statement) or return &Error("Can't create '$name' table: ".$DBH->errstr()); # Force commit on creation $DBH->commit() or return &Error("Bad commit: ".$DBH->errstr()); my $check = "SHOW TABLES LIKE '$A2PDB_TABLE_PREFIX$name'" ; $TABLES{$name} = ($DBH->do($check) > 0) ? (&Debug("Table '$name' available"),1) : &Error("Table '$name' not created: ".$DBH->errstr()); } } return $TABLES{$name} ; } my $cache_timer = [ &gettimeofday() ]; sub check_commit { # Add a call to this sub in a loop to force commit after max_timer seconds # Check cache each minute to definitively remove value older than 10 minutes if ( &tv_interval($cache_timer) > 60 and defined($CACHE->{'__expiration_list'}) and @{$CACHE->{'__expiration_list'}}) { my @temp = () ; # Don't pass more than one second to purge the cache my $timeout_timer = [ &gettimeofday() ]; while ( @{$CACHE->{'__expiration_list'}} and &tv_interval($timeout_timer) < 1 ) { my $oldwhat = shift @{$CACHE->{'__expiration_list'}} ; my $value_ref = &recursive_ref($CACHE,@{$oldwhat}); if (ref($value_ref) ne 'ARRAY') { &Warn("(@{$oldwhat}) gives no valid value from cache, " . "cache may be corrupted"); next ; } elsif (&tv_interval($value_ref->[1])<600) { # Keep too young values push @temp, $oldwhat ; next ; } # Here this is a value to remove from cache while (@{$oldwhat}) { my $key = pop @{$oldwhat} ; my $ref = &recursive_ref($CACHE,@{$oldwhat}) or last; if (ref($ref) =~ /^HASH/) { delete $ref->{$key} ; # Don't delete hash ref if other value are cached here last if (keys(%{$ref})); } } } # Update expiration list with not expired kept list if (@{$CACHE->{'__expiration_list'}}) { push @{$CACHE->{'__expiration_list'}}, @temp ; } else { $CACHE->{'__expiration_list'} = \@temp ; } # Reset cache timer $cache_timer = [ &gettimeofday() ]; } return 1 unless ( $actual_commit and &tv_interval($commit_timer) > $max_timer ); # Check to adapt futur timer check, reduce max_timer, it will be added # with 1 second after the commit call, so it will be corrected just after my $reduce_max_timer = (@commit_timers < $STATS_COMMIT_SKIP>>1) ; # Only do a correction if not enough commit has occured, here not enought # means commits rate is lower than half of expected rate # Keep only timers lower than current max_timer interval while (@commit_timers and &tv_interval($commit_timers[0]) > $max_timer ) { shift @commit_timers ; } # Reset timer for next commit check $commit_timer = [ &gettimeofday() ] ; $actual_commit = $STATS_COMMIT_SKIP ; &commit(); # max_timer is added with one second if lower than 10 # Then apply correction (so will be -2 first time, than -1) $max_timer -= ($max_timer>2 and $reduce_max_timer) ? 2 : 0 ; } sub commit { # Do nothing if AutoCommit if activated return defined($actual_commit = 0) if (defined($DBH->{AutoCommit}) and $DBH->{AutoCommit}); # Keep call time push @commit_timers, [ &gettimeofday() ] ; return 1 unless ( ++ $actual_commit > $STATS_COMMIT_SKIP ); # Update max_timer if still not to 10 seconds $max_timer ++ if ( $max_timer < 10 ); $actual_commit = 0 ; $DBH->commit() or return &Error("Bad commit: ".$DBH->errstr()); 1 ; } sub get_table_hash { my $table = shift ; my $key = shift ; my $internal_stat_key = 'DB-GET-TABLE-HASH-' . uc($table) ; &TIMESTAT($internal_stat_key); &UPSTAT($internal_stat_key); return &Error("Can't get not specified table") unless (defined($table) and $table); return &Error("Can't get hash with unspecified key") unless (defined($key) and $key); unless (exists($TABLES{$table})) { &Debug("Force checking '$table' table is available"); &CheckTable($table); } return &Error("Can't get hash for unknown table '$table'") unless (defined($TABLES{$table}) and $TABLES{$table}); my $sth = $DBH->prepare_cached("SELECT * FROM $A2PDB_TABLE_PREFIX$table") or return &Error("Can't prepare request to get '$table' table: " . $DBH->errstr()); $sth->execute() or return &Error("Can't get '$table' table: ".$DBH->errstr()); my $hashref ; # Check if we can call fetchall_hashref with an ARRAY ref if ( ref($key) =~ /^ARRAY/ and $DBI::VERSION < 1.48 ) { # Simulate the same API than later DBI version my $arrayref = $sth->fetchall_arrayref({}) or return &Error("Can't get '$table' table as array of hash: " . $DBH->errstr()); $hashref = {} ; while (@{$arrayref}) { my $row = shift @{$arrayref} ; my $newref = $hashref ; my @keys = @{$key} ; while (@keys) { my $nextkey = shift @keys ; my $this = $row->{$nextkey} ; # Finally affect the row to the last ref $newref->{$this} = @keys ? {} : $row unless (exists($newref->{$this})); $newref = $newref->{$this} ; } } } else { $hashref = $sth->fetchall_hashref($key) or return &Error("Can't get '$table' table as hash: " . $DBH->errstr()); } $sth->finish ; &MAXSTAT($internal_stat_key.'-KEYCOUNT',scalar(keys(%{$hashref}))); &TIMESTAT($internal_stat_key); return $hashref ; } ################################################################################ ### Statistics API ################################################################################ my $Statistics_ready = undef ; sub a2p_db_statistics_ready { return $Statistics_ready if (defined($Statistics_ready)); # Check needed tables are availables foreach my $table (keys(%CREATE_TABLE)) { next if ($table =~ /jobs/); &CheckTable($table) or return $Statistics_ready = 0 ; } return $Statistics_ready = 1 ; } sub CheckJobsTables { my $date = shift || '' ; ( $MONTH ) = $date =~ /^(\d{6})/ ; &CheckTable('afpjobs_'.$MONTH,\&job_tablename_check) or return &Error("No table available for afpjobs_$MONTH month"); &CheckTable('jobs_'.$MONTH,\&job_tablename_check) or return &Error("No table available for jobs_$MONTH month"); } sub job_tablename_check { my $name = shift || '' ; &Debug("Checking table name '$name'"); return $name =~ /^(\w*jobs_)/ ? $1 . 'YYYYMM' : $name ; } sub stats_set_service { $service = shift || 'unknown' ; } sub mysql_insert_id { my $select = $DBH->prepare_cached(qq{ SELECT LAST_INSERT_ID() }); $select->execute() or return &Error("Can't get last inserted id: ".$DBH->errstr()); my ( $last_id ) = $select->fetchrow_array ; $select->finish ; return $last_id ? $last_id : &Error("Can't get valid last inserted id") ; } sub recursive_ref { my $ref = shift ; return $ref unless @_ ; my $key = shift ; unless (ref($ref) =~ /^HASH/i and exists($ref->{$key})) { &Debug("No value referenced as $ref -> $key") if __A2P_DB_DEBUG__ ; return 0 ; } return &recursive_ref( $ref->{$key}, @_ ); } sub cached { my $what = shift || [] ; my $value = undef ; return $value unless ( @{$what} > 1 ) ; if (@_) { my $which_list = ($what->[0]).'__list' ; $value = shift ; # Set cache for a value push @{$CACHE->{$which_list}}, [ @{$what} ] ; # Push old values in a list with expiration checked later if (@{$CACHE->{$which_list}} > $MAX_CACHED_DB) { my $oldwhat = shift @{$CACHE->{$which_list}} ; push @{$CACHE->{'__expiration_list'}}, $oldwhat ; } # Prepare hash tree cache &Debug("Caching $value as ".join('=>',@{$what})) if __A2P_DB_DEBUG__ ; my $whatref = $CACHE ; my $whatkey = shift @{$what} ; while (@{$what}) { unless (defined($whatref->{$whatkey})) { $whatref->{$whatkey} = {} ; &Debug("Cache hash $whatref -> $whatkey initialized") if __A2P_DB_DEBUG__ ; } $whatref = $whatref->{$whatkey} ; $whatkey = shift @{$what} ; } # Set cache and return now to not update cache statistics $whatref->{$whatkey} = [ $value , [ &gettimeofday() ]]; &Debug("$value cache as $whatref -> $whatkey") if __A2P_DB_DEBUG__ ; } elsif (defined($CACHE)) { my $value_ref = &recursive_ref($CACHE,@{$what}); if ($value_ref and ref($value_ref) =~ /^ARRAY/) { $value = $value_ref->[0] ; # Update value timer $value_ref->[1] = [ &gettimeofday() ] ; } # Update internal hit/miss cache statistics defined($value) ? &UPSTAT('CACHE-HIT--'.($what->[0])) : &UPSTAT('CACHE-MISS-'.($what->[0])) ; } return $value ; } sub set_afpjid_from { my $JobID = shift ; return &Error("No JobID provided") unless $JobID ; &Warn("Trying to set afpjid for $JobID and $MONTH without serviceid set") unless ($serviceid); my $cache_key_list = [ 'afpjid', $MONTH, $serviceid, $JobID ] ; # Return cached value my $cached = &cached($cache_key_list) ; return $afpjid = $cached if (defined($cached) and $cached); my $select = $DBH->prepare_cached(qq{ SELECT afpjid FROM ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH WHERE JobID=? and serviceid=? }); $select->execute($JobID,$serviceid) or return &Error("Can't select afpjid from '$JobID' JobID " . "in afpjobs table: " . $DBH->errstr()); ( $afpjid ) = $select->fetchrow_array ; $select->finish ; unless (defined($afpjid) and $afpjid) { # Insert a new row for this job my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH (JobID,serviceid,Date,DayTime,State,Errors,NBJobs,Chrono,Counted) VALUES(?,?,?,?,' ',0,0,0,0) }); $insert->execute($JobID,$serviceid,$JOB->[0],$JOB->[1]) or return &Error("Can't insert '$JobID' JobID in afpjobs table: " . $DBH->errstr()); $afpjid = &mysql_insert_id() if (&commit()); } return $afpjid = &Error("Can't get a valid afpjid") unless (defined($afpjid) and $afpjid); &Debug("Set afpjid to $afpjid") if (__A2P_DB_DEBUG__ and defined($afpjid)); return &cached($cache_key_list, $afpjid); } sub set_countid { return &Error("Can't set new counter without valid serviceid") unless (defined($serviceid) and $serviceid); my $cache_key_list = [ 'countid', $MONTH, $serviceid ] ; # Return cached value my $cached = &cached($cache_key_list) ; return $countid = $cached if (defined($cached) and $cached); # Search for countid my $select = $DBH->prepare_cached(qq{ SELECT countid FROM ${A2PDB_TABLE_PREFIX}Count WHERE serviceid=? and YearMonth=? }); $select->execute($serviceid,$MONTH) or return &Error("Can't select countid in Count table: " . $DBH->errstr()); ( $countid ) = $select->fetchrow_array ; $select->finish ; unless ((defined($countid) and $countid) or @_) { # Insert a new row for this service and month my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}Count (serviceid,YearMonth,afpjobs_nb,afpjobs_tps,afpjobs_err, jobs_nb,jobs_err,pages_nb) VALUES(?,?,0,0,0,0,0,0) }); $insert->execute($serviceid,$MONTH) or return &Error("Can't insert counter in Count table: " . $DBH->errstr()); $countid = &mysql_insert_id() if (&commit()); } return $countid = &Error("Can't get a valid countid") unless (defined($countid) and $countid); &Debug("Set countid to '$countid'") if __A2P_DB_DEBUG__ ; return &cached($cache_key_list, $countid) ; } sub set_ddid { my $cache_key_list = [ 'ddid', $destid, $docid ] ; # Return cached value my $cached = &cached($cache_key_list) ; return $ddid = $cached if (defined($cached) and $cached); # Search for ddid my $select = $DBH->prepare_cached(qq{ SELECT ddid FROM ${A2PDB_TABLE_PREFIX}DocDestCount WHERE destid=? AND docid=? }); $select->execute($destid,$docid) or return &Error("Can't select ddid in DocDestCount table: " . $DBH->errstr()); ( $ddid ) = $select->fetchrow_array ; $select->finish ; unless (defined($ddid) and $ddid) { # Insert a new row for this document my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}DocDestCount (destid,docid,FirstTime,LastTime,Count) VALUES(?,?,?,?,0) }); $insert->execute($destid,$docid,$JOB->[0],$JOB->[0]) or return &Error("Can't insert docdest counter " . "in DocDestCount table: " . $DBH->errstr()); $ddid = &mysql_insert_id() if (&commit()); } return $ddid = &Error("Can't get a valid ddid") unless (defined($ddid) and $ddid); &Debug("Set ddid to '$ddid'") if __A2P_DB_DEBUG__ ; return &cached($cache_key_list, $ddid) ; } sub set_destid_from { my $DestID = shift || '(none)' ; $DestID = '(NULL)' if ($DestID =~ /^NULL$/i); $DestID = '(none)' if ($DestID =~ /^\s*$/); my $cache_key_list = [ 'destid', $DestID ] ; # Return cached value my $cached = &cached($cache_key_list) ; return $destid = $cached if (defined($cached) and $cached); # Get serviceid from table or insert a new one my $destinations = &get_table_hash('Destinations','DestIdName'); return &Error("Can't get Destinations hash") unless (defined($destinations) and $destinations); $destid = $destinations->{$DestID}->{'destid'} if (defined($destinations->{$DestID})); unless (defined($destid) and $destid) { # Here we need to insert a new service and get the new serviceid &Debug("Inserting new DestID '$DestID' after last '$destid' one") if __A2P_DB_DEBUG__ ; my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}Destinations (DestIdName, FirstTime, LastTime, Count) VALUES(?,?,?,0) }); $insert->execute($DestID,$JOB->[0],$JOB->[0]) or return &Error("Can't insert '$DestID,$JOB->[0],$JOB->[0],0' " . "in Destinations table: ".$DBH->errstr()); # Get last inserted id $destid = &mysql_insert_id() if (&commit()); } return $destid = &Error("Can't get a valid destid") unless (defined($destid) and $destid); &Debug("Last inserted destination has '$destid' id") if __A2P_DB_DEBUG__ ; return &cached($cache_key_list, $destid ); } sub set_docid { # Set cache list avoiding empty keys my $cache_key_list = [ 'docid', @{$JOB}[5..7], map { $_.'_' } @{$JOB}[8..10] ] ; # Return cached value my $cached = &cached($cache_key_list) ; return $docid = $cached if (defined($cached) and $cached); # Search for docid my $select = $DBH->prepare_cached(qq{ SELECT docid FROM ${A2PDB_TABLE_PREFIX}Documents WHERE PageDef=? AND FormDef=? AND DocName=? AND PageFormats=? AND CopyGroups=? AND Logos=? }); $select->execute(@{$JOB}[5..10]) or return &Error("Can't select docid in Documents table: " . $DBH->errstr()); ( $docid ) = $select->fetchrow_array ; $select->finish ; unless (defined($docid) and $docid) { # Insert a new row for this document my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}Documents (PageDef,FormDef,DocName,PageFormats,CopyGroups, Logos,FirstTime,LastTime,Count) VALUES(?,?,?,?,?,?,?,?,0) }); $insert->execute(@{$JOB}[5..10],$JOB->[0],$JOB->[0]) or return &Error("Can't insert document in Documents table: " . $DBH->errstr()); # Get last inserted id $docid = &mysql_insert_id() if (&commit()); } return $docid = &Error("Can't get a valid docid") unless (defined($docid) and $docid); &Debug("Set docid to '$docid'") if __A2P_DB_DEBUG__ ; return &cached($cache_key_list, $docid); } sub set_jid_from { my $Number = shift || 0 ; my $cache_key_list = [ 'jid', $MONTH, $afpjid, $Number ] ; # Return cached value my $cached = &cached($cache_key_list) ; return $jid = $cached if (defined($cached) and $cached); my $select = $DBH->prepare_cached(qq{ SELECT jid FROM ${A2PDB_TABLE_PREFIX}jobs_$MONTH WHERE afpjid=? and Number=? }); $select->execute($afpjid,$Number) or return &Error("Can't select jid from '$Number' Number " . " in jobs table: " . $DBH->errstr()); ( $jid ) = $select->fetchrow_array ; $select->finish ; unless (defined($jid) and $jid) { # Insert a new row for this job my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}jobs_$MONTH (afpjid,Number,destid,docid, NbPages,State,Timing,Chrono,Kind,Errors,Counted,Jobname) VALUES(?,?,?,0,0,' ',0,0,' ',0,0,'') }); $insert->execute($afpjid,$Number,$destid) or return &Error("Can't insert job '$Number' for '$afpjid' afpjob " . " in jobs table: " . $DBH->errstr()); $jid = &mysql_insert_id() if (&commit()); } return $jid = &Error("Can't get a valid jid") unless (defined($jid) and $jid); &Debug("Set jid to '$jid'") if (__A2P_DB_DEBUG__); return &cached($cache_key_list, $jid); } sub set_serviceid_from { my $LockID = shift || 'unknown' ; my $cache_key_list = [ 'serviceid', $Server, $service, $LockID ] ; # Return cached value my $cached = &cached($cache_key_list) ; return $serviceid = $cached if (defined($cached) and $cached); &Debug("Services table not cached") if __A2P_DB_DEBUG__ ; # Get serviceid from table or insert a new one my $Services = &get_table_hash('Services','serviceid') or return &Error("Can't get Services hash"); # Parse DB table returned as hash my $found = 0 ; foreach my $id (keys(%{$Services})) { next unless ($Services->{$id}->{Name} =~ /^$service$/ ); next unless ($Services->{$id}->{Server} =~ /^$Server$/i ); next unless ($Services->{$id}->{LockID} =~ /^$LockID$/ ); &Debug("Extracted serviceid '$id' from Services table") if __A2P_DB_DEBUG__ ; $found ++ ; $serviceid = $id ; last ; } unless ($found and $serviceid) { # Here we need to insert a new service and get the new serviceid &Debug("Inserting new service '$service,$Server,$LockID' after last " . "'$serviceid' one") if __A2P_DB_DEBUG__ ; my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}Services (Name, Server, LockID) VALUES(?,?,?) }); $insert->execute($service,$Server,$LockID) or return &Error("Can't insert '$service,$Server,$LockID' " . "in Services table: " . $DBH->errstr()); # Get last inserted id $serviceid = &mysql_insert_id() if (&commit()); &Debug("Last inserted service has $serviceid id") if __A2P_DB_DEBUG__ ; } return $serviceid = &Error("Can't get a valid serviceid") unless (defined($serviceid) and $serviceid); &Debug("Keeping $serviceid id in cache") if __A2P_DB_DEBUG__ ; $cached = &cached($cache_key_list, $serviceid); &Debug("$cached value kept in cache") if __A2P_DB_DEBUG__ ; return $cached ; } sub stats_converter { $JOB = shift ; return &Error("Converter line: Not connected to DB") unless (&a2p_db_connected()); # Reset global value for that line ( $afpjid , $jid , $countid, $serviceid, $destid ) = ( 0, 0, 0, 0, 0 ); ( $docid , $ddid ) = ( 0, 0 ); # 1. Check tables for job are availables &CheckJobsTables($JOB->[0]) or return &Error("Converter line: " . "Tables not available for $JOB->[0] date"); # 2. Update serviceid as we known LOCKID &set_serviceid_from($JOB->[13]) or return &Error("Converter line: Can't find DB serviceid"); # 3. Set countid &set_countid() or return &Error("Converter line: Can't find DB countid"); # 4. Set afpjid &set_afpjid_from($JOB->[2]) or return &Error("Converter line: Can't find DB afpjid"); # 7. Check if it has been counted my $select = $DBH->prepare_cached(qq{ SELECT Counted FROM ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH WHERE afpjid=? }); $select->execute($afpjid) or return &Error("Can't select job row from '$jid' jid in jobs table: " . $DBH->errstr()); my ( $afpjid_counted ) = $select->fetchrow_array ; $select->finish ; # 5. Set destid &set_destid_from( $JOB->[4] ) or return &Error("Converter line: Can't find DB destid"); # 6. Set jid my ( $number ) = $JOB->[3] =~ /^.*-AFP(\d+)$/ ; &set_jid_from( $number ) or return &Error("Converter line: Can't find DB jid"); # 7. Check if it has been counted $select = $DBH->prepare_cached(qq{ SELECT Counted FROM ${A2PDB_TABLE_PREFIX}jobs_$MONTH WHERE jid=? }); $select->execute($jid) or return &Error("Can't select job row from '$jid' jid in jobs table: " . $DBH->errstr()); my ( $counted ) = $select->fetchrow_array ; $select->finish ; # 8. Still return if it still has been counted return $counted if ($counted & 1); # 9. Set docid &set_docid() or return &Error("Converter line: Can't find DB docid"); # 10. Set ddid &set_ddid() or return &Error("Converter line: Can't find DB ddid"); # 11. Set session variable my $settime = $DBH->prepare_cached(q{ SET @THISTIME = ?, @THISHOUR = ? }); $settime->execute($JOB->[0],$JOB->[1]) or return &Error("Converter line: " . "Can't set THISTIME to '$JOB->[0]': " . $DBH->errstr()); # 12. Update row in jobs table my $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}jobs_$MONTH SET docid=?, Jobname=?, NbPages=?, Kind=?, Counted=Counted | 1 WHERE jid=? }); $update->execute($docid,$JOB->[3],$JOB->[11],$JOB->[12],$jid) or return &Error("Converter line: " . "Can't update row '$jid' in jobs_$MONTH table: " . $DBH->errstr()); # 13. At the same time, update jobs_nb and pages_nb if ($JOB->[11] =~ /^\d+$/) { $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}Count SET afpjobs_nb=afpjobs_nb+?, jobs_nb=jobs_nb+1, pages_nb=pages_nb+$JOB->[11] WHERE countid=? }); $update->execute( $afpjid_counted ? 0 : 1 , $countid ) or return &Error("Converter line: " . "Can't update $countid row in Count table: " . $DBH->errstr()); } # 14. Update also Destination counter $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}Destinations SET Count=Count+1, FirstTime=IF(FirstTime<=\@THISTIME,FirstTime,\@THISTIME), LastTime=IF(LastTime>=\@THISTIME,LastTime,\@THISTIME) WHERE destid=? ; }); $update->execute($destid) or return &Error("Converter line: " . "Can't update $destid row in Destinations table: " . $DBH->errstr()); # 15. Update also Document counter $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}Documents SET Count=Count+1, FirstTime=IF(FirstTime<=\@THISTIME,FirstTime,\@THISTIME), LastTime=IF(LastTime>=\@THISTIME,LastTime,\@THISTIME) WHERE docid=? }); $update->execute($docid) or return &Error("Converter line: " . "Can't update $docid row in Documents table: " . $DBH->errstr()); # 16. Update also DocDest counter $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}DocDestCount SET Count=Count+1, FirstTime=IF(FirstTime<=\@THISTIME,FirstTime,\@THISTIME), LastTime=IF(LastTime>=\@THISTIME,LastTime,\@THISTIME) WHERE ddid=? }); $update->execute($ddid) or return &Error("Converter line: " . "Can't update $ddid row in DocDestCount table: " . $DBH->errstr()); # 17. Update also NBJobs counter in afpjobs $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH SET NBJobs=NBJobs+1, Counted=Counted | 1, Date=IF(Date<=\@THISTIME,Date,\@THISTIME), DayTime=IF(DayTime<=\@THISHOUR,DayTime,\@THISHOUR) WHERE afpjid=? }); $update->execute($afpjid) or return &Error("Converter line: " . "Can't update $afpjid row in afpjobs_$MONTH table: " . $DBH->errstr()); # Finally. Commit changes and return commit success return &commit() ; } sub stats_jobmanager { $JOB = shift ; return &Error("JobManager line: Not connected to DB") unless (&a2p_db_connected()); ( $afpjid , $jid , $countid, $serviceid, $destid ) = ( 0, 0, 0, 0, 0 ); # 1. Check tables for job are availables &CheckJobsTables($JOB->[0]) or return &Error("JobManager line: Tables not available for " . $JOB->[0] . " date"); # 2. Update serviceid as we known LOCKID &set_serviceid_from($JOB->[8]) or return &Error("JobManager line: Can't find $JOB->[8] DB serviceid"); # 3. Set countid &set_countid() or return &Error("JobManager line: Can't find DB countid"); if ($JOB->[4]) { # 4. Set afpjid as extrated from jid my ( $JobID ) = $JOB->[2] =~ /^(.*)-\d+$/ ; &set_afpjid_from( $JobID ) or return &Error("JobManager line: Can't find DB afpjid for job"); # 5. Set destid &set_destid_from( $JOB->[3] ) or return &Error("JobManager line: Can't find DB destid"); # 6. Set jid my ( $number ) = $JOB->[2] =~ /^.*-(\d+)$/ ; &set_jid_from( $number ) or return &Error("JobManager line: Can't find DB jid"); # 7. Check if it has been counted my $select = $DBH->prepare_cached(qq{ SELECT Counted FROM ${A2PDB_TABLE_PREFIX}jobs_$MONTH WHERE jid=? }); $select->execute($jid) or return &Error("JobManager line: " . "Can't check if counted in jobs table: " . $DBH->errstr()); my ( $counted ) = $select->fetchrow_array ; $select->finish ; # 8. Still return if it still has been counted return $counted if ($counted & 2); # 9. Update row in jobs table my $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}jobs_$MONTH SET State=?, Errors=?, Timing=?, Chrono=?, Counted=Counted | 2 WHERE jid=? }); my ( $chrono ) = $JOB->[7] =~ /^([0-9.]+)/ ; my ( $timing ) = $JOB->[9] =~ /^([0-9.]+)/ ; $update->execute($JOB->[4],$JOB->[6],$timing,$chrono,$jid) or return &Error("JobManager line: " . "Can't update row '$jid' in jobs_$MONTH table: " . $DBH->errstr()); # 10. At the same time, update jobs_err when some found if ($JOB->[6] =~ /^\d+$/ and $JOB->[6]) { $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}Count SET jobs_err=jobs_err+$JOB->[6] WHERE countid=? }); $update->execute($countid) or return &Error("JobManager line: " . "Can't update $countid row in Count table: " . $DBH->errstr()); } } else { # 4. Set afpjid &set_afpjid_from($JOB->[2]) or return &Error("JobManager line: Can't find DB afpjid of afpjob"); # 5. Check if it has been counted my $select = $DBH->prepare_cached(qq{ SELECT Counted FROM ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH WHERE afpjid=? }); $select->execute($afpjid) or return &Error("JobManager line: " . "Can't check if counted in afpjobs table: " . $DBH->errstr()); my ( $counted ) = $select->fetchrow_array ; $select->finish ; # 6. Still return if it still has been counted return $counted if ($counted & 2); # 7. Update row in afpjobs table my $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}afpjobs_$MONTH SET State=?, Errors=?, Chrono=?, Counted=Counted | 2 WHERE afpjid=? }); my ( $chrono ) = $JOB->[7] =~ /^([0-9.]+)/ ; $update->execute($JOB->[5],$JOB->[6],$chrono,$afpjid) or return &Error("JobManager line: " . "Can't update row '$afpjid' in afpjobs_$MONTH table: " . $DBH->errstr()); # 8. At the same time, update afpjobs_nb and afpjobs_err Count if ($JOB->[6] =~ /^\d+$/ and $JOB->[6]) { $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}Count SET afpjobs_nb=afpjobs_nb+?, afpjobs_tps=afpjobs_tps+?, afpjobs_err=afpjobs_err+$JOB->[6] WHERE countid=? }); } else { $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}Count SET afpjobs_nb=afpjobs_nb+?, afpjobs_tps=afpjobs_tps+? WHERE countid=? }); } $update->execute($counted ? 0 : 1 , $chrono/1000, $countid) or return &Error("JobManager line: " . "Can't update $countid row in Count table: " . $DBH->errstr()); } # Finally. Commit changes and return commit success return &commit() ; } ################################################################################ ### JobStatus API ################################################################################ my $service_list = 0 ; my $service_list_timer = [] ; sub CheckJobsStatusTables { &CheckTable('Services') or return &Error("No 'Services' table available for managing Jobs status"); return &CheckTable('jobs_status') || &Error("No 'jobs_status' table available for Jobs status"); } sub CheckJobsQueueTable { return &CheckTable('jobs_queue') || &Error("No 'jobs_queue' table available for Jobs management"); } sub get_serviceid_info { # API to retrieve the services sid of local services &UPSTAT('JOBSTATUS-DB-GET-SERVICES-TRY'); # Update service list ref only one time per minute if ($service_list and @{$service_list_timer} and not @_) { return $service_list unless (&tv_interval($service_list_timer) > 60); &UPSTAT('JOBSTATUS-DB-GET-SERVICES-RECHECK'); } # Get serviceid from table my $Services = &get_table_hash('Services', [qw(Server Name LockID)] ) or return &Error("Can't get Services hash"); # Set timer $service_list_timer = [ &gettimeofday() ] ; # Get infos object if still not created $service_list = new A2P::Infos() unless ($service_list) ; if ( defined($_[0]) and $_[0] =~ /check all/i ) { # Compute service id for each servers to cache them foreach my $server ( keys(%{$Services}) ) { my $hash = $Services->{$server} ; &Debug("Caching $server sid"); $service_list = $service_list->get_serviceid_of( $hash ); } # Return A2P::Infos object return $service_list ; } elsif ( ! exists($Services->{$Server}) or ref($Services->{$Server}) !~ /^HASH/ ) { # No entry in Services table will need to initialize it return &Debug("No entry in table for '$Server' (@_)"); } # Initialize/update A2P::Infos object with the ref for this server # It returns 0 or itself as A2P::Infos object return $service_list->get_serviceid_of( $Services->{$Server} ); } sub _ms { # Get current millisecond my @time = &gettimeofday() ; # If a parameter is defined, we want to add the seconds with it. It could be # a negative number to get a timer to compare $time[0] += shift if @_ ; return int( $time[0] * 1000 + $time[1]/1000 ); } sub job_status_update { my $sid = shift || 0 ; my $lockid = shift ; my $job = shift ; $serviceid = $service_list ? $service_list->get_serviceid_of($lockid) : 0 ; # If serviceid is not known we should try to re-initialize Infos object as # the serviceid could just has been inserted in table unless ($serviceid) { &get_serviceid_info('check now'); # Just any arg to avoid timer check $serviceid = $service_list->get_serviceid_of($lockid) || 0 ; } # Finally the serviceid not exists in table we should create a new one unless ($serviceid) { # Use default statistics APIs my $sname = $service_list->get_servicename_of($lockid) || '' ; return &Error("Can't handle service $lockid without configuration") unless $sname ; &stats_set_service($sname); &set_serviceid_from($lockid); unless ($serviceid) { &get_serviceid_info('recheck now'); $serviceid = $service_list->get_serviceid_of($lockid) || 0 ; } return &Error("Can't set new serviceid for $lockid") unless $serviceid ; } # Prepare row to insert or update my @row = map { defined($_) ? $_ : "" } $serviceid, $job, @_[0..2], join("",@{$_[3]}), @_[4..10] ; # Check if we can find the job in DB if it has no sid as it could has been # run in on another computer and/or service. Check also if we are rechecking # status after an a2p-status service restart unless ( $sid =~ /^\d+$/ and $sid ) { my $select = $DBH->prepare_cached(qq{ SELECT sid FROM ${A2PDB_TABLE_PREFIX}jobs_status WHERE Jobname=? OR ( serviceid=? and JobID=? ) }); &UPSTAT('JOBSTATUS-DB-CHECKSID'); &Debug("Check if job $row[8] is still in DB"); $select->execute( $row[8], $serviceid, $job ) or return &Error("Can't check SID in jobs_status table: ". $DBH->errstr()); my ( $thissid ) = $select->fetchrow_array ; $select->finish ; $sid = $thissid if (defined($thissid) and $thissid and $thissid =~ /^\d+$/); } if ($sid) { &TIMESTAT('JOBSTATUS-DB-UPDATE'); # Prepare to update a row if revision is not up to date my $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}jobs_status SET serviceid=?, JobID=?, Revision=?, Timer=?, Step=?, Steps=?, Born=?, Date=?, JobName=?, CurrentStatus=?, NbJobs=?, DestIDs=?, Infos=? WHERE sid = ? AND (Revision < ? OR Born < ?) }); &Debug("Update row $sid for $job rev$row[2] with '@row'"); &UPSTAT('JOBSTATUS-DB-UPDATE'); $update->execute(@row, $sid, $row[2], $row[6]) or $sid = &Error("Can't update $job db jobstatus: ".$DBH->errstr()); if ($sid and $DBH->commit()) { &UPSTAT('JOBSTATUS-DB-UPDATED'); } else { $sid = 0 ; &Debug("Bad commit on $job Job Status update"); } &TIMESTAT('JOBSTATUS-DB-UPDATE'); } else { # Insert a new row # Do a table optimization and purge at last every 60 minutes &Warn("jobs_status DB table not optimized and purged") unless (&job_status_purge()); &TIMESTAT('JOBSTATUS-DB-INSERT'); &Debug("Insert '@row' into jobs_status table"); # Insert new jobstatus in table my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}jobs_status (serviceid,JobID,Revision,Timer,Step,Steps, Born,Date,JobName,CurrentStatus,NbJobs,DestIDs,Infos) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?) }); &UPSTAT('JOBSTATUS-DB-INSERT'); my $executed = $insert->execute(@row) || &Error("Can't insert '$row[1]' JobID in job_status table: ". $DBH->errstr()); # Return the sid inserted if ($executed and $DBH->commit()) { $sid = &mysql_insert_id() ; &UPSTAT('JOBSTATUS-DB-INSERTED'); &MAXMINSTAT('JOBSTATUS-BORN-TO-DB-TIMING-ms', &_ms() - $row[6]); &Debug("Last insertid SID is $sid"); } elsif ($executed) { &Error("No sid retrieved for '$row[1]' Job Status insertion"); } &TIMESTAT('JOBSTATUS-DB-INSERT'); } return $sid ; } my %optimize = () ; # Time out list sub optimize_table { return unless ($DO_DB_OPTIMIZATION); my $table = shift or return "" ; # Do a table optimization if 1 hour has pasted since last optimization return 0 if ( exists($optimize{$table}) and time < $optimize{$table} ); # Protect table name return 0 unless ( $table =~ /^\w+$/ ); # Set time out to next hour $optimize{$table} = time + 3600 ; &TIMESTAT(uc($table).'-TABLE-OPTIMIZATION'); # Do table optimization and disable it if it fails unless ($DBH->do("OPTIMIZE TABLE ${A2PDB_TABLE_PREFIX}$table")) { &Info("Can't optimize $table DB table"); # Don't try to optimize until next service restart $optimize{$table} = [] ; } &TIMESTAT(uc($table).'-TABLE-OPTIMIZATION'); &UPSTAT(uc($table).'-TABLE-OPTIMIZED'); return $optimize{$table} ; } sub job_status_delete { # A valid SID must be provided my $sid = shift || return 0 ; return 0 unless ($sid =~ /^\d+$/); $DBH->do(qq{ DELETE FROM ${A2PDB_TABLE_PREFIX}jobs_status WHERE sid=$sid }) or return &Error("Can't delete row $sid in jobs_status DB table"); $DBH->commit() or &Error("Can't commit row $sid deletion in jobs_status DB table"); &UPSTAT('JOBSTATUS-DB-DELETE'); &Debug("Deleted status with sid $sid in DB"); # Do a table optimization and purge at last every 60 minutes &Warn("jobs_status DB table not optimized and purged") unless (&job_status_purge()); return 1 ; } my $purge_timout = 0 ; sub job_status_purge { # Prevent purge too soon by calling optimize_table just before which will # will return true only every half an hour return 1 unless ( $STATUS_DB_MAXAGE and time > $purge_timout ); # Reset time out $purge_timout = time + 1800 ; # Delete max_aged rows, Timer is ms avec parameter is hour my $time = time * 1000 ; my $timeout = $time - 3600000 * $STATUS_DB_MAXAGE ; unless ($NO_SYSLOG_DEBUG) { # Log which row will be deleted when debug mode is activated &Debug("Try deleting job status row at time $time ms since EPOCH..."); &Debug("Deleting any job status DB row with Timer lower than $timeout"); my $select = $DBH->prepare_cached(qq{ SELECT s.LockID,j.JobID,j.Revision,j.Timer,j.Step,j.Steps, j.Born,j.JobName,j.CurrentStatus,j.NbJobs,j.Infos,j.dbtime FROM ${A2PDB_TABLE_PREFIX}Services s, ${A2PDB_TABLE_PREFIX}jobs_status j WHERE j.Timerexecute($timeout)) { while ( my @row = $select->fetchrow_array ) { &Debug("Deleting status row: @row"); } } $select->finish ; } # DO DELETE $DBH->do(qq{ DELETE FROM ${A2PDB_TABLE_PREFIX}jobs_status WHERE CurrentStatus LIKE 'DONE' AND Timer<$timeout }) or return &Error("Can't do aged deletion in jobs_status table"); $DBH->commit() or return &Error("Can't commit aged deletion in jobs_status table"); # Finaly optimize the table &optimize_table('jobs_status'); return 1 ; } sub is_job_status_updated { my $sid = shift or return 0 ; my $rev = shift || 0 ; return 0 unless (&a2p_db_connected); my $select = $DBH->prepare_cached(qq{ SELECT s.LockID,j.JobID,j.Revision,j.Timer,j.Step,j.Steps, j.Born,j.Date,j.JobName,j.CurrentStatus,j.NbJobs FROM ${A2PDB_TABLE_PREFIX}Services s, ${A2PDB_TABLE_PREFIX}jobs_status j WHERE j.sid=? and s.serviceid=j.serviceid }); $select->execute($sid) or return &Error("Can't check $sid SID update in jobs_status table: ". $DBH->errstr()); my @row = $select->fetchrow_array ; $select->finish ; &UPSTAT('JOBSTATUS-DB-IS-UPDATED?'); # Check if revision has been updated if ( $row[2] > $rev ) { &Debug("Found new revision $row[2] for $row[1] (sid=$sid) in DB"); return @row ; } else { return () ; } } # List of accepted filter and the corresponding SQL statement to add computed # in a sprintf call with the transmitted value my %sql_filters = ( LOCKID => "s.LockID LIKE '%s'", NAME => "( j.JobID LIKE '*%s*' OR j.JobName LIKE '*%s*' )", DAY => "j.Date LIKE '%s'", STATUS => { 'any' => [ "j.CurrentStatus LIKE 'AB%'", "j.CurrentStatus LIKE 'KO'", "NOT ( j.CurrentStatus LIKE 'DO%' OR j.CurrentStatus LIKE 'KO' OR j.CurrentStatus LIKE 'AB%' )", "j.CurrentStatus LIKE 'DO%'" ], 'done' => [ "j.CurrentStatus LIKE 'DO%'" ], 'ko|abterm' => [ "j.CurrentStatus LIKE 'AB%'", "j.CurrentStatus LIKE 'KO'" ], 'done|ko|abterm' => [ "j.CurrentStatus LIKE 'AB%'", "j.CurrentStatus LIKE 'KO'", "j.CurrentStatus LIKE 'DO%'" ], }, NOTSTATUS => { 'done' => [ "j.CurrentStatus LIKE 'AB%'", "j.CurrentStatus LIKE 'KO'", "NOT ( j.CurrentStatus LIKE 'DO%' OR j.CurrentStatus LIKE 'KO' OR j.CurrentStatus LIKE 'AB%' )" ], 'done|ko|abterm' => [ "NOT ( j.CurrentStatus LIKE 'DO%' OR j.CurrentStatus LIKE 'KO' OR j.CurrentStatus LIKE 'AB%' )" ], 'ko|abterm' => [ "NOT ( j.CurrentStatus LIKE 'DO%' OR j.CurrentStatus LIKE 'KO' OR j.CurrentStatus LIKE 'AB%' )", "j.CurrentStatus LIKE 'DO%'" ] }, DESTID => "j.DestIDs LIKE '*%s*'" ); my $get_next_select ; my $get_next_count ; my @statements = () ; sub get_next_jobstatus { &a2p_db_connect unless $DBH ; unless (defined($get_next_select) and $get_next_select) { my $sid = shift || 0 ; $sid = 0 unless ( $sid =~ /^\d+$/ ); my $sql = '' ; # Prepare SQL template my $template = qq{ SELECT j.sid as SID, s.LockID as LOCKID, j.JobID as JOBID, j.Revision as REV, j.Timer as TIMER, j.Step as STEP , j.Steps as STATE, j.Born as BORN, j.Date as DAY, j.JobName as AFP, j.CurrentStatus as STATUS, j.NbJobs as NBJOBS, j.DestIDs as DESTIDS, j.Infos as INFOS, j.dbtime as DBTIME FROM ${A2PDB_TABLE_PREFIX}Services s, ${A2PDB_TABLE_PREFIX}jobs_status j WHERE j.sid>$sid AND s.serviceid=j.serviceid %s ORDER BY SID DESC %s } ; $get_next_count = 0 ; # Compute a SQL statement to add as WHERE to get faster requests my $filter = "" ; my $limit = "" ; # Filters must be provided in a hash as column/like value pairs if ( @_ and defined($_[0]) and $_[0] ) { my $hash = shift ; # Check if we are limited in row number to retrieve if ( exists($hash->{LIMIT}) ) { $limit = " LIMIT 0," . $hash->{LIMIT} if ( $hash->{LIMIT} =~ /^\d+$/ ); delete $hash->{LIMIT} ; } # Check status filter my @Status = () ; my ( $status_key ) = grep { /STATUS/i } keys(%{$hash}) ; if ( $status_key ) { @Status = @{$sql_filters{$status_key}->{$hash->{$status_key}}} ; delete $hash->{$status_key} ; } else { @Status = @{$sql_filters{'STATUS'}->{'any'}} ; } # Check other keys to prepare filter foreach my $key ( keys(%{$hash}) ) { next unless (exists($sql_filters{$key})); my $value = $hash->{$key} ; if (ref($sql_filters{$key}) =~ /^HASH/i) { $filter .= " AND " . $sql_filters{$key}->{$value} ; } else { # Strip illegal chars $value =~ s/[^%*0-9A-Za-z_.-]//g ; $filter .= " AND " . sprintf($sql_filters{$key},$value) ; } } # Prepare definitive SQL @statements = map { sprintf( $template, ' AND ' . $_ . $filter, $limit ) } @Status ; $sql = shift @statements ; } else { # Set default statement $sql = sprintf( $template, '' ); @statements = () ; } $get_next_select = $DBH->prepare_cached($sql); $get_next_select->execute() or return $get_next_select = &Error("Can't get jobs status in " . "jobs_status table: " . $DBH->errstr()); } my $row = $get_next_select->fetchrow_hashref ; # Try next statements in list when available while ( !(defined($row) or ref($row) =~ /^HASH/) and @statements ) { if ($DBH->errstr) { &Error("Problem when retrieving jobs status in jobs_status table: ". $DBH->errstr()); } $get_next_select->finish ; my $sql = shift @statements ; if ( defined($sql) and $sql ) { $get_next_select = $DBH->prepare_cached($sql); $get_next_select->execute() or return $get_next_select = &Error("Can't get jobs status " . "with next statement: " . $DBH->errstr()); $row = $get_next_select->fetchrow_hashref ; } } unless (defined($row) and ref($row) =~ /^HASH/) { &MAXSTAT('JOBSTATUS-DB-GET',$get_next_count); if ($DBH->errstr) { &Error("Problem when retrieving jobs status in jobs_status table: ". $DBH->errstr()); } $get_next_select->finish ; $get_next_select = undef ; &a2p_db_disconnect ; } return $row ; } sub job_queue_update { my $jid = shift || 0 ; my $mode = shift || 0 ; my $serv = shift || 0 ; my $name = shift || ""; my $afp = shift || "" ; my $rev = shift || 0 ; # set it to negative value to delete the jid ref row my $data = shift ; &a2p_db_connect unless $DBH ; if (!defined($data)) { # This is a synchronization check or a delete request if ( $jid and $rev >= 0 ) { &TIMESTAT('A2PJOB-DB-SYNC'); # Prepare to update a row if revision is not up to date my $synchro = $DBH->prepare_cached(qq{ SELECT Revision, Object FROM ${A2PDB_TABLE_PREFIX}jobs_queue WHERE jid = ? }); &Debug("Syncing $name rev$rev in jobs_queue table"); &UPSTAT('A2PJOB-DB-SYNC'); $synchro->execute($jid) or $jid = &Error("Can't update $name in db: ".$DBH->errstr()); my @synchro = $synchro->fetchrow_array ; $synchro->finish ; &TIMESTAT('A2PJOB-DB-UPDATE'); return $synchro[0] > $rev ? $synchro[1] : undef ; } elsif ( $jid and $jid =~ /^\d+$/ and $rev < 0 ) { &TIMESTAT('A2PJOB-DB-DELETE'); &Debug("Deleting row $jid of $name in jobs_queue table"); # Request deletion in DB my $result = $DBH->do(qq{ DELETE FROM ${A2PDB_TABLE_PREFIX}jobs_queue WHERE jid=$jid }) || &Error("Can't delete row $jid in jobs_queue DB table"); $result = $DBH->commit() || &Error("Can't commit row $jid deletion in jobs_queue table") if ($result); &TIMESTAT('A2PJOB-DB-DELETE'); return undef unless $result ; &UPSTAT('A2PJOB-DB-DELETE'); # Do a table optimization at some time &optimize_table('jobs_queue'); return $result ; } elsif ($jid) { &Error("Invalid '$jid' jid detected"); return undef ; } else { &Error("Can't synchronize '$name' without jid set"); return undef ; } } # Check if jobname is in DB and then get the jid if (!$jid and ($name or $afp)) { &TIMESTAT('A2PJOB-DB-FIND-JID'); # Prepare to update a row if revision is not up to date my $select ; if ($name) { $select = $DBH->prepare_cached(qq{ SELECT jid FROM ${A2PDB_TABLE_PREFIX}jobs_queue WHERE JobID=? }); } else { $select = $DBH->prepare_cached(qq{ SELECT jid FROM ${A2PDB_TABLE_PREFIX}jobs_queue WHERE Jobname=? }); } &UPSTAT('A2PJOB-DB-CHECKJID'); &Debug("Check if job $name or $afp is in DB"); $select->execute( $name ? $name : $afp ) or return &Error("Can't check JID in jobs_queue table: ". $DBH->errstr()); my ( $thisjid ) = $select->fetchrow_array ; $select->finish ; $jid = $thisjid if (defined($thisjid) and $thisjid); &TIMESTAT('A2PJOB-DB-FIND-JID'); } if ($jid) { &TIMESTAT('A2PJOB-DB-UPDATE'); # Prepare to update a row if revision is not up to date my $update = $DBH->prepare_cached(qq{ UPDATE ${A2PDB_TABLE_PREFIX}jobs_queue SET Mode=?, serviceid=?, JobID=?, JobName=?, Revision=?, Object=? WHERE jid=? }); &Debug("Updating $name rev$rev in jobs_queue table"); &UPSTAT('A2PJOB-DB-UPDATE'); $update->execute($mode, $serv, $name, $afp, $rev, $data, $jid) or $jid = &Error("Can't update $name in db: ".$DBH->errstr()); if ($jid and $DBH->commit()) { &UPSTAT('A2PJOB-DB-UPDATED'); } else { $jid = 0 ; &Warn("Bad commit on $name AfpJob update"); } &TIMESTAT('A2PJOB-DB-UPDATE'); } elsif ($name) { &TIMESTAT('A2PJOB-DB-INSERT'); &Debug("Inserting '$name', '$afp' into jobs_queue table"); # Insert new jobstatus in table my $insert = $DBH->prepare_cached(qq{ INSERT INTO ${A2PDB_TABLE_PREFIX}jobs_queue (Mode,serviceid,JobID,JobName,Revision,Object) VALUES(?,?,?,?,?,?) }); &UPSTAT('A2PJOB-DB-INSERT'); my $executed = $insert->execute($mode,$serv,$name, $afp, $rev, $data) || &Error("Can't insert '$name' AfpJob in jobs_queue table: ". $DBH->errstr()); # Return the sid inserted if ($executed and $DBH->commit()) { $jid = &mysql_insert_id() ; &UPSTAT('A2PJOB-DB-INSERTED'); } elsif ($executed) { &Error("No jid retrieved for '$name' AfpJob insertion"); } &TIMESTAT('A2PJOB-DB-INSERT'); } return $jid ; } my $get_next_afpjob_select ; my $get_next_afpjob_count ; sub get_next_afpjob { my $mode = shift || 0 ; my $serviceid = shift || 0 ; return 0 unless (&a2p_db_connected); unless (defined($get_next_afpjob_select) and $get_next_afpjob_select) { $get_next_afpjob_count = 0 ; # Necessary to make the next SELECT working $DBH->commit(); if ($serviceid) { &UPSTAT('A2PJOB-DB-CHECK-SERVICEID-'.$serviceid); $get_next_afpjob_select = $DBH->prepare_cached( qq{ SELECT j.jid as JID, j.Mode as MODE, j.JobID as JOBID, j.JobName as JOBNAME, j.Revision as REV, j.Object as THAWN, j.dbtime as DBTIME FROM ${A2PDB_TABLE_PREFIX}jobs_queue j WHERE Mode=? AND serviceid=? ORDER BY JID DESC } ); $get_next_afpjob_select->execute($mode,$serviceid) or return $get_next_afpjob_select = &Error( "Can't get rows in jobs_queue table for serviceid $serviceid:" . $DBH->errstr()); } else { &UPSTAT('A2PJOB-DB-CHECK-SERVICEID-UNDEF'); $get_next_afpjob_select = $DBH->prepare_cached( qq{ SELECT j.jid as JID, j.Mode as MODE, j.JobID as JOBID, j.JobName as JOBNAME, j.Revision as REV, j.Object as THAWN, j.dbtime as DBTIME FROM ${A2PDB_TABLE_PREFIX}jobs_queue j WHERE Mode=? ORDER BY JID DESC } ); $get_next_afpjob_select->execute($mode) or return $get_next_afpjob_select = &Error( "Can't get rows in jobs_queue table in mode $mode: " . $DBH->errstr()); } } my $row = $get_next_afpjob_select->fetchrow_hashref ; unless (defined($row) and ref($row) =~ /^HASH/) { &UPSTAT('A2PJOB-DB-ALL-GOT'); &MAXSTAT('A2PJOB-DB-GET',$get_next_afpjob_count); if ($DBH->errstr) { &Error("Error when retrieving afpjob objects in jobs_queue table: ". $DBH->errstr()); } $get_next_afpjob_select->finish ; undef $get_next_afpjob_select ; # Avoid disconnection in a2p-status &a2p_db_disconnect unless ($serviceid); $row = 0 ; } return $row ; } sub a2p_db_avoid_disconnect_in_forked { # This sub must be called in forked processes to avoid closing parent # connections when forked process is exiting $DBH->{'InactiveDestroy'} = 1 if (defined($DBH) and $DBH); } ################################################################################ ### Module End ################################################################################ &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;