#!/usr/bin/perl # # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id$ # # Simple service to synchronize Job STATISTICS with a DB # BEGIN { use strict ; use warnings ; use POSIX 'setsid' ; use A2P::Globals ; our $VERSION = sprintf "%s", q$Rev: 415 $ =~ /(\d[0-9.]+)\s+/ ; if (defined($ENV{JAVA_HOME}) and $ENV{JAVA_HOME}) { # Don't fork if launch by Eclipse, script launcher unset JAVA_HOME $Progname = "a2p-eclipse" ; $MUSTQUIT = 1 ; } else { # Daemonize early my $forkpid = 0 ; our $argument = $ARGV[0] || "" ; chdir '/' or die "Can't chdir to /: $!"; open *STDIN , '<', '/dev/null' or die "Can't read from /dev/null: $!"; open *STDOUT, '>', $StdoutFile or die "Can't write to $StdoutFile: $!"; defined($forkpid = fork) or die "Can't fork: $!"; # Save son PID in system file provided as argument if file exists if ( $forkpid > 0 ) { if ( length($argument) > 0 ) { open *PIDFILE , '>', $argument or die "Can't open PID file '$argument': $!"; print PIDFILE $forkpid ; close(PIDFILE); } exit(0); } } } our $VERSION ; our $argument ; our $Progname ; our $MUSTQUIT ; #-------------------------------------------------------------------------- # Here we import any needed modules only after we are in background #-------------------------------------------------------------------------- use strict ; use warnings ; use Socket ; use IO::Socket ; use Fcntl ; use Fcntl qw(:flock); use GDBM_File ; use POSIX qw( :signal_h :sys_wait_h setsid ); use Time::HiRes qw( usleep gettimeofday tv_interval ); use A2P::Init ; use A2P::Globals; use A2P::Syslog ; use A2P::Signal ; use A2P::Thread ; use A2P::Signal 'LogSigMessage' ; use A2P::DB ; use A2P::DB qw( a2p_db_statistics_ready CheckJobsTables stats_set_service stats_jobmanager stats_converter check_commit ); &Debug(" Module load error: $! $? $@") if ( $! or $? or $@ ); &Debug(" =============================START=============================="); &setsid or die "Can't set a new session: $!"; open *STDERR, '>&', *STDOUT or die "Can't redirect STDERR to STDOUT: $!"; #------------------------------------- # Here we are in a daemonized process #------------------------------------- &Debug("Current progname = $0, version $VERSION, started"); &Debug("Setting service name to $Progname"); # Update system name of this daemon to be pretty with ps command $0 = $Progname ; # Reset error code $! = 0 ; # Here constant my $dbmbase = "statistics.dbm" ; # First forked process is the master my $MASTER = 1 ; # Empty Sons/Threads lists and hashs, hash keys are processes PID # PID -> Managed folder my %sons = () ; my $LCK ; my $UNIXSOCK ; my %FILES = () ; # Hash to tie my $pid ; ################################################################################ ### Only called from Main loop ################################################################################ sub CheckThreads { my %known = () ; map { $known{$_} = 0 } values(%sons) ; while ( $MASTER and @_ ) { $STATS_FOLDER = shift ; next unless (defined($STATS_FOLDER) and $STATS_FOLDER); # Skip still managed folder if (defined($known{$STATS_FOLDER})) { &Debug("Skipping still managed '$STATS_FOLDER' folder"); } else { # Set some file name my $unixname = $SERVICE_TMP . "/" . $dbmbase . '-' .$$. '_' .time ; my $dbmname = $STATS_FOLDER . "/." . $dbmbase ; my $lockname = $dbmname . ".LCK" ; # First try to lock the lock file if exists if ( -e $lockname ) { &Debug("Checking why lock file still exists..."); if (open($LCK,$lockname)) { if (flock($LCK, LOCK_EX | LOCK_NB)) { flock($LCK, LOCK_UN); close($LCK); &Debug("Removing $lockname lock file on '$STATS_FOLDER'" . " as it is not used"); unlink $lockname ; } else { &Debug("Still managed by other thread " . " as I can't lock on '$lockname': $!"); } } else { &Error("Can't open lock file on '$STATS_FOLDER': $!"); } } if ( -e $lockname and -e $dbmname ) { &Info("'$STATS_FOLDER' statistics manager seems still present"); $lockname = "" ; my $tied = tie( %FILES,'GDBM_File',$dbmname,GDBM_READER,0666 ); if (defined($tied)) { $pid = $FILES{'PID'} ; my $sockname = $FILES{'SOCKET'} ; if (socket(SOCK, PF_UNIX, SOCK_STREAM, 0)) { if (-S $sockname and connect(SOCK, pack_sockaddr_un($sockname))) { $SIG{'ALRM'} = sub { close(SOCK) } ; alarm 5 ; # Get response until 5 seconds kill 12 , $pid ; my @messages = ; alarm 0 ; close(SOCK); my $last = pop @messages || "" ; &Debug("Got '$last' message on '$sockname'"); $last =~ m|^$| ; if (defined($1) and time - $1 < 60) { &Info("'$STATS_FOLDER' statistics manager " . "still available as process $pid"); $sons{$pid} = $STATS_FOLDER ; } else { &Info("'$STATS_FOLDER' statistics manager seems" . " unavailable, trying to start another"); $lockname = $dbmname . ".LCK" ; } } elsif (-S $sockname) { &Error("Can't connect to '$sockname' socket: $!"); } else { &Info("'$STATS_FOLDER' statistics manager seems " . "not listening, trying to launch another one"); $lockname = $dbmname . ".LCK" ; } } else { &Error("Can't create an unix socket: $!"); } } else { &Error("Can't open tied file $dbmname for reading: $!"); } # Remove lock file if set again unlink $lockname if $lockname ; } if ($lockname) { &Info("Trying to start '$STATS_FOLDER' statistics manager"); if (open($LCK,">$lockname")) { if (flock($LCK, LOCK_EX | LOCK_NB)) { if (defined(tie ( %FILES, 'GDBM_File', $dbmname, GDBM_WRCREAT, 0666 ))) { # Now we can fork as pre-init is done $pid = fork ; if (defined($pid)) { if ($pid) { # In parent = master $sons{$pid} = $STATS_FOLDER ; untie %FILES ; close($LCK); &Debug("'$STATS_FOLDER' statistics manager" . "forked"); } else { # In statistics manager &InitStatistics($unixname); &Info("'$STATS_FOLDER' statistics manager" . " started"); } } else { &Error("$STATS_FOLDER manager not forked: $!"); untie %FILES ; flock($LCK, LOCK_UN); close($LCK); } } else { &Error("Can't open tied hash file '$dbmname': $!"); flock($LCK, LOCK_UN); close($LCK); } } else { &Error("Can't lock folder '$STATS_FOLDER'" . "on '$lockname': $!"); close($LCK); } } else { &Error("Can't open '$lockname' lock file: $!"); } } else { &Info("Can't start '$STATS_FOLDER' statistics manager"); } } # Now we know this folder is managed $known{$STATS_FOLDER} = 1 ; } if ($MASTER) { # Check to stop no more needed threads foreach my $folder ( grep { ! $known{$_} } keys(%known) ) { &Info("Stopping synchronization of statistics from '$folder'"); ( $pid ) = grep { $sons{$_} eq $folder } keys(%sons) ; next unless (defined($pid)); &Debug("Sending SIGTERM to T$pid"); kill 15 , $pid ; } } else { # Free not used memory in son undef %sons ; } } sub InitCheck { # Read conf $do_init = 0 ; &Init ; # List statistics folders my @STATS_FOLDERS = () ; if ( $STATS_FOLDER =~ /:/ ) { @STATS_FOLDERS = split(/:/,$STATS_FOLDER) ; } else { @STATS_FOLDERS = ( $STATS_FOLDER ) ; } # Check needed folders my %Folders = ( SERVICE_TMP => $SERVICE_TMP, SHMDIR => $SHMDIR ); while ( my ($var, $Path) = each(%Folders) ) { unless ( -d $Path ) { &Info("Creating $Path folder as $var"); my $Folder = "" ; foreach my $tmp_path ( split(m{[/]+}, $Path) ) { $Folder .= "/$tmp_path" ; mkdir $Folder , oct(775) unless ( -d $Folder ); &Error("Can't create '$Path' folder as $var: $!"), last unless ( -d $Folder ); } } } # Initialize SERVICE_TMP folder if ( -d $SERVICE_TMP ) { unless ( system("touch $SERVICE_TMP/init_$Progname") == 0 ) { &Error("Can't initialize $SERVICE_TMP folder: $!"); $MUSTQUIT = 1 ; } } else { &Error("Can't initialize unavailable $SERVICE_TMP folder: $!"); $MUSTQUIT = 1 ; } # Test opening logfile in case of file logging if ( $LOGFILENAME and ( $LOGFILE_VS_SYSLOG or $DEBUG_IN_FILE )) { $! = 0 ; unless (open(LOG,">$LOGFILENAME")) { $LOGFILE_VS_SYSLOG = 0 ; $DEBUG_IN_FILE = 0 ; return &Error("Can't open '$LOGFILENAME': $!"); } if ( $LOGFILE_PURGE ) { &UPSTAT('PURGE-LOG'); truncate LOG , 0 ; $LOGFILE_PURGE = 0 ; } close(LOG); } # Reset Debug comportement &ResetDebug() ; &CheckThreads(@STATS_FOLDERS); # @STATS_FOLDERS will be check in sons return @STATS_FOLDERS ; } ################################################################################ ### Only called from Forked loop ################################################################################ my %FH ; my %FH_TIMEOUT ; sub InitStatistics { my $unixname = shift ; $0 .= "-thread" ; $MASTER = 0 ; $SIG{'USR2'} = \&SigUSR2 ; $FILES{'PID'} = $$ ; if (&openUNIXSOCK($unixname)) { $FILES{'SOCKET'} = $unixname ; } else { $FILES{'SOCKET'} = "" ; } # Synchronise %FILES with %FH foreach my $file (keys(%FILES)) { next if ($file =~ /^(SOCKET|PID)$/); my $fullpathfile = "$STATS_FOLDER/$file" ; if ( -e $fullpathfile and ! -d $fullpathfile ) { $FH{$file} = 0 ; } else { delete $FILES{$file} ; delete $FH_TIMEOUT{$file} ; } } } # Do some statistics load timing my @LastLoadCheck = &gettimeofday() ; my $LoadedLines = 0 ; my $activity = 0 ; sub statistics { # List available files my @files = <$STATS_FOLDER/*-Converter-????????> ; push @files , <$STATS_FOLDER/*-JobManager-????????> ; &TIMESTAT('FilesControl'); for my $file (@files) { # Only keep files with Converter or JobManager # and only a date at the end of the name my @count = $file =~ m*$STATS_FOLDER/([\w-]+)-(Converter|JobManager)-(\d{8})$* ; unless ( ! -d $file and @count == 3 ) { &Debug("Skipping '$file' read") if ($ADVANCED_DEBUGGING); next ; } # Keep short filename only $file = join('-',@count) ; unless (defined($FH{$file})) { my ($service,$type,$date) = @count ; &Debug("Found new file to handle: $file for service " . $count[0] . ", generated on " .$count[2]); $FILES{$file} = $FH{$file} = 0 ; } # Check to return immediatly when requested return 0 if ( $do_quit or $MUSTQUIT); } &TIMESTAT('FilesControl'); # Lines counter to load my $line_nb = 0 ; # Read files, get an array of new lines by file my %newstats = () ; &TIMESTAT('FilesRead'); foreach my $file (keys(%FH)) { my $fh = $FH{$file} ; # Don't read too much lines at a loop my $maxlines = $MAX_LINES ; my $stats_file = "$STATS_FOLDER/$file" ; # 0. Check file is still present unless ( $fh or -s $stats_file ) { &Debug("'$file' was deleted"); close($fh) if $fh ; delete $FH{$file} ; delete $FILES{$file} ; delete $FH_TIMEOUT{$file} ; next ; } # 1. Check old file then open not opened files if ($FILES{$file} == -s $stats_file) { # Only check to delete a file if no new line are found and one day # has past since file was closed or last checked if (defined($FH_TIMEOUT{$file}) and &tv_interval($FH_TIMEOUT{$file})>86400) { my $fileage = ( time - (stat($stats_file))[9] )/ 86400 ; if ( $fileage > $STATSFILE_MAXAGE ) { &Info("Deleting old statistics file: $stats_file"); unlink $stats_file or return &Error("Can't delete old statistics file: $!"); close($FH{$file}) if $FH{$file} ; delete $FH{$file} ; delete $FILES{$file} ; delete $FH_TIMEOUT{$file} ; } else { # Next check in one day $FH_TIMEOUT{$file} = [ &gettimeofday() ] ; } } # Handles a timer on opened file to only keep few opened files if ($FH{$file}) { if (defined($FH_TIMEOUT{$file})) { # Close filehandle after 30 minutes if not used if (&tv_interval($FH_TIMEOUT{$file}) > 1800) { close($FH{$file}) ; $FH{$file} = 0 ; delete $FH_TIMEOUT{$file} ; } } else { $FH_TIMEOUT{$file} = [ &gettimeofday() ] ; } } # Anyway loop to another file next ; } elsif ( ! ( $fh = $FH{$file} )) { undef $fh ; # Handle must be undefined when opening on it return &Error("Can't open '$file' for reading: $!") unless ( open $fh , $stats_file ); $FH{$file} = $fh ; delete $FH_TIMEOUT{$file} ; } # 2. Seek to skip still handled lines seek $fh, $FILES{$file}, 0 if ($fh and $FILES{$file} != tell($fh)); # 3. Read lines locking on the file to not read while it is written unless (eof($fh)) { my $locked = flock($fh, LOCK_EX); next unless ($locked); my $line = '' ; while ( ! eof($fh) and $maxlines-- > 0 ) { $line = readline($fh) ; last unless (defined($line) and $line); push @{$newstats{$file}} , $line ; $line_nb ++ ; } flock($fh, LOCK_UN); unless (@{$newstats{$file}}) { &Debug("No new line found in '$file'"); delete $newstats{$file} ; } # Check to don't read too much line at a time #last unless ($maxlines > 0 ); } # Check to return immediatly when requested return 0 if ( $do_quit or $MUSTQUIT); } &TIMESTAT('FilesRead'); if (keys(%newstats)) { &Debug("Starting DB update"); # Be sure we are connected to DB my $connected = &a2p_db_connected() ? 1 : &a2p_db_connect() ; return &Error("Not connected to db") unless ($connected); # Be sure DB is ready for statistics &a2p_db_statistics_ready() or return &Error("Db not ready for statistics"); # Reset activity meter $activity = 0 ; } else { $activity ++ unless ($activity > 100); # Still return if nothing to do return $activity > 10 ? $activity : 10 ; } foreach my $stat (keys(%newstats)) { my ( $service, $proc , $date ) = $stat =~ /^(.*)-(JobManager|Converter)-(\d+)$/ ; unless (defined($proc) and $proc and defined($date) and $date) { &Error("Won't handle lines from unrecognized '$stat' file"); delete $newstats{$stat} ; next ; } &stats_set_service($service); scalar(@{$newstats{$stat}}) ; &Debug("Have " . $line_nb . " lines to analyse in $stat") if ( $line_nb > 100 ); foreach my $line (@{$newstats{$stat}}) { chomp $line ; # Skip headers lines unless ( $line =~ /^(\d{8});(\d{6});/ ) { $line_nb -- ; next ; } my @Job = split(/;/,$line) ; if ($proc eq 'JobManager' and scalar(@Job) > 8) { &stats_jobmanager(\@Job) or return &Error("Can't manage '$line' JobManager " . "statistics line as (@Job) array from $stat file"); } elsif ($proc eq 'Converter' and scalar(@Job) > 12) { &stats_converter(\@Job) or return &Error("Can't manage '$line' Converter " . "statistics line as (@Job) array from $stat file"); } else { &Info("Can't handle '$line' as bad formatted '$proc' line " . "from $stat file"); next ; } # Check statistics loading rate each 30 seconds $LoadedLines ++ ; if (&tv_interval(\@LastLoadCheck) > 30) { &Info(sprintf( "%d more lines loaded: Loading rate %.2f lines per second" . " (now on %s)", $LoadedLines, $LoadedLines/&tv_interval(\@LastLoadCheck), $stat)) if ($LoadedLines>100); @LastLoadCheck = &gettimeofday() ; $LoadedLines = 0 ; } # Check to return immediatly when requested return 0 if ( $do_quit or $MUSTQUIT); # Check to commit after a delay, and also cache control &check_commit(); } # Update %FILES to current file position after DB has been updated $FILES{$stat} = tell($FH{$stat}) ; } &Debug("DB update finished") ; # Here every thing has been done return 1 ; } sub openUNIXSOCK { my $sockname = shift ; &Debug("Creating '$sockname' unix socket"); # Open UNIX Socket socket( $UNIXSOCK , PF_UNIX, SOCK_STREAM , 0 ) or return &Error("Can't create an unix socket: $!"); bless $UNIXSOCK , "IO::Socket" ; unlink $sockname ; bind( $UNIXSOCK , pack_sockaddr_un( $sockname ) ) or return &Error("Can't bind unix socket on '$sockname': $!"); listen( $UNIXSOCK , SOMAXCONN ) or return &Error("Can't listenn on unix socket '$sockname': $!"); &Debug("'$sockname' unix socket created"); 1 ; } sub SigUSR2 { if (defined($UNIXSOCK)) { my $socket ; $UNIXSOCK->blocking(0); my $addr = accept($socket,$UNIXSOCK); if ( $addr ) { &Debug("Got connection on $socket socket"); bless $socket , "IO::Socket" ; $socket->autoflush(1); my $msg = "" ; print $socket "$msg\n" ; close($socket); &Debug("Printed '$msg' on $socket socket"); } else { &Warn("No client connection on SIGUSR2 received"); } } else { &Warn("Can't say I'm active"); } } ################################################################################ ### Pre-init ################################################################################ &Info("$Progname service v" . A2P_RPM_VERSION . " starting"); my @MarkTimer = &gettimeofday() ; $STATS{'A2PSTAT-VERSION'} = A2P_RPM_VERSION ; &InitCheck ; # This sub forks other managed statistics folders ################################################################################ ### Main loop ################################################################################ my $started_check = $MASTER ? 10 : 0 ; my %restart = () ; my $factor = 0 ; &TIMESTAT('MainLoop'); while ( ! $MUSTQUIT and (! $MASTER or keys(%sons))) { if ($started_check) { &Info("$Progname service v" . A2P_RPM_VERSION . " started") unless (--$started_check); } &LogSigMessage(); if ($do_test) { if ($MASTER) { $STATS{'#### ID ####'} = "MASTER-$$" ; kill 10, keys(%sons) ; } else { $STATS{'#### ID ####'} = "THREAD-$$ on $STATS_FOLDER" ; } A2P::Thread::self_TEST(); } @MarkTimer = &gettimeofday() , &Info("---MARK---") unless ( ! $MASTER or &tv_interval(\@MarkTimer) < 1800 ); ########################### # Statistics handling code ########################### last unless ($MASTER or $factor = &statistics); # Check also stopped thread in master while ( $MASTER and $pid = waitpid(-1,&WNOHANG) ) { last if ( $pid < 0 ); &Debug("Process $pid returned"); if (defined($sons{$pid})) { my $folder = $sons{$pid} ; if ($?>>8) { &Warn("$folder statistics manager returned with code ".($?>>8)); } else { &Info("$folder statistics manager returned"); } my @folders = values(%sons) ; delete $sons{$pid} ; # Check if we need to restart the thread unless ( $do_quit or $started_check ) { &CheckThreads(@folders); &Info("'$folder' statistics manager restarted"); my $now = time ; if (defined($restart{$folder})) { # Reset restart statistics if older than a minute $restart{$folder} = [ 0 , $now ] if (time - $restart{$folder}->[1] > 60); } else { $restart{$folder} = [ 0 , $now ] ; } $restart{$folder}->[0] ++ ; my $last = $now - $restart{$folder}->[1] ; if ( $last and $restart{$folder}->[0]/$last > 2 ) { &Debug(sprintf("Restart rate = %.2f" , $restart{$folder}->[0]/$last)); &Alert("Can't manage '$folder' statistics folder: " . "Too much restart"); $do_quit ++ ; } } } else { &Warn("Unknown process $pid returned"); } } ####################### # Check flags ####################### # true after QUIT/TERM signal has been received last if ( $do_quit ); # true after HUP signal has been received if ( $do_init ) { my $temp = $STATS_FOLDER ; # Will re-check and keep STATS_FOLDER in sons # Still quit if our STATS_FOLDER not in the current last unless (grep { /^$temp$/ } &InitCheck()); if ($MASTER) { map { &Debug("Sending HUP signal to $_ thread"); kill 1, $_ ; } keys(%sons) ; &Info("Configuration reloaded"); } else { $STATS_FOLDER = $temp ; } } # Check to do last commits &check_commit($factor); $factor = 1 unless ($factor); &TIMESTAT('MainLoop'); usleep $factor * $USLEEP ; &TIMESTAT('MainLoop'); } # We can safely activate debugging when quitting is requested $NO_SYSLOG_DEBUG = 0 if $SERVICE_DEBUG ; &LogSigMessage ; if ( $MASTER ) { &Info("$Progname service is stopping"); &CheckThreads(); # This will kill sons with SIGTERM } else { # Remove unix socket close($UNIXSOCK); unlink $FILES{'SOCKET'} ; &a2p_db_disconnect(); # Free DBM file and statistics folder untie %FILES ; flock($LCK, LOCK_UN); close($LCK); } # Wait until every child has quit while ( $pid = wait ) { last if ( $pid < 0 ); &Debug("T$pid has terminated"); } &LogSigMessage ; if ($MASTER) { if (defined($argument)) { &Debug("PID file = '$argument'"); # Delete our PID in PID file if (open RET , ">$argument") { print RET "0" ; close(RET); } else { &Error("Can't open PID file '$argument': $!"); } } &Info("$Progname service stopped"); } else { &Info("Synchronization of statistics from '$STATS_FOLDER' stopped"); } exit(0); END { # Output statistics when quitting from a disgnostic service A2P::Thread::self_TEST() if ($Progname =~ /^diagnostics/); $MASTER ? &Debug("=============================QUIT===============================") : &Debug($0 . "[$$] =====QUIT====="); &LogSigMessage ; }