# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: threads.pm 3 2007-10-18 16:20:19Z guillaume $ # package A2P::threads; use strict ; use integer ; use UNIVERSAL 'isa' ; use Time::HiRes qw( usleep gettimeofday tv_interval ) ; use A2P::Globals ; use A2P::Syslog ; use A2P::Signal ; use A2P::Com qw( IsCom comSAY ); BEGIN { use Exporter (); our ( $VERSION , @ISA , @EXPORT , @EXPORT_OK ); $VERSION = sprintf "%s", q$Rev: 490 $ =~ /(\d[0-9.]+)\s+/ ; @ISA = qw(Exporter); @EXPORT = qw( %sons %sonname %SonKind %HasQuit @answers $started ); @EXPORT_OK = qw( &THCountAll &THCountStopping &is_son &FromToDoCom &SetNewLogger &ThreadsStarter &ThreadsChecker &CheckChildrenQuitTimeOut &is_really_logger &threads_internal_updates &THList &threadSleep &reduceSleep ); } our $VERSION ; ############################################################################# ## Declaration and vars update ## ############################################################################# our %SonKind = (); # Sons/Threads and Quit status hash list, keys are processes PID our %sons ; our %sonname ; our %HasQuit ; my %LastRestart ; # Children answers list our @answers ; my $currentlogger = 0 ; our $started = 0 ; my $controlrestart = 0 ; my $restartcheck = [ &gettimeofday() ] ; # Should be called at any conf reload request by the parent sub threads_internal_updates { %SonKind = ( ####################################################################### # Thread Alias Thread Factor ####################################################################### 'JobManager' => 1, 'SpoolManager' => $SCAN_SPOOL ? 1 : 0, 'Listener' => 1, 'Logger' => $START_LOGGER, 'Converter' => $MAXJOBS, 'BackEnd' => $MAXJOBS * $BACKEND_FACTOR, 'Archiver' => $ARCH_ENABLED < 100 ? $BACKEND_FACTOR * $ARCH_ENABLED : $BACKEND_FACTOR, 'EService' => $ESERVICE_ENABLED < 100 ? $BACKEND_FACTOR * $ESERVICE_ENABLED : $BACKEND_FACTOR ####################################################################### ); } &threads_internal_updates(); ############################################################################# ## Exported tools used in any process ## ############################################################################# my $SleepFactor = 1 ; sub threadSleep { # Short sleep (USLEEP/2) when quitting return usleep $USLEEP >> 1 if ($do_quit); usleep $SleepFactor * $USLEEP ; if ($DYNAMIC_USLEEP) { &UPSTAT('LOOPS-SLEEP'); &UPSTAT('LOOPS-SKIPPED',$SleepFactor-1); # Update SleepFactor and its monitoring &MAXSTAT('SLEEP-FACTOR',++ $SleepFactor) unless ( $SleepFactor * $USLEEP >= $MAX_DYNAMIC_USLEEP ); } } sub reduceSleep { return unless ( $DYNAMIC_USLEEP and $SleepFactor > 1 ); # Keep stats on SleepFactor when it has grown, unless called with any value &UPSTAT('MAXSLEEPFACTOR-REACHED-'.$SleepFactor) unless @_ ; # Decrease rapidly SleepFactor by binary division until reach 1 # Greater than 4, we divide by 2^2=4 else by 2^1=2 $SleepFactor >>= $SleepFactor > 4 ? 2 : $SleepFactor > 1 ; } ############################################################################# ## Exported tools used in parent ## ############################################################################# # Thread check mini subs sub THList { return grep { $sons{$_}->is($_[0]) } keys(%sons) ; } sub THAlive { return grep { ! $HasQuit{$_} } &THList($_[0]) ; } sub THCountAll { return scalar(grep { ! $sons{$_}->StoppingSince() } keys(%sons)); } sub THCountStopping { return scalar(grep { $sons{$_}->StoppingSince() } keys(%sons)); } sub is_son { return 0 unless (defined($sons{$_[0]})); ref($sons{$_[0]}) =~ /^A2P::(.*)$/ ; return defined($1) ? defined($SonKind{$1}) : 0 ; } sub is_really_logger { return &is_son($_[0]) and $sons{$_[0]}->is('Logger') ; } sub FromToDoCom { my $reqid = shift ; my $kind = shift ; my @threads = $kind =~ /^\d+$/ ? ( $kind ) : &THAlive($kind) ; &Error("No such $kind thread for a request") unless @threads ; # Find one available thread to do the request while ( @threads ) { my $chance = int(rand(@threads)); my $thread = shift @threads ; if ($chance) { # Choose randomly which process to try to not send request to the # same free process push @threads, $thread ; next ; } my $requester = $sons{$thread}->isBusyBy() ; if ($requester) { &Debug($sonname{$thread} . " still busy by " . $sonname{$requester}, "Business is on '" . $sons{$thread}->getBusyReq() . "'"); } else { # We don't want to busy JobManager and Listener unless by parent $reqid = 0 if ( $reqid != $$ and ($sons{$thread}->is('JobManager','Listener'))); # Keep who generate the request to return next answer $sons{$thread}->setBusy( $reqid , @_ ) unless ( ! $reqid or &IsCom( comSAY , $_[0] )); $sons{$thread}->AskThread( TODO , @_ ); &Debug("Request transmitted to " . $sonname{$thread}); return 1 ; } } return 0 ; } sub SetNewLogger { # Argument can be < 0 to initialize logger with current logger thread $loggertid = shift if @_ ; ( $loggertid ) = &THList('Logger') if ( $loggertid < 0 ); return $loggertid if ( $loggertid == $currentlogger ); &SetLogger( $sons{$loggertid} ); map { $sons{$_}->SwitchLogger( $loggertid ) } keys(%sons) ; $currentlogger = $loggertid ; } ############################################################################# ## Thread starter and checker code ## ############################################################################# sub ThreadsStarter { my $Sub = shift ; my $debug = $_[0] ? shift : 0 ; my $MaxTask = $MAXTASK * $SonKind{$Sub} ; my $Object = 'A2P::' . $Sub ; &Debug("Launching starter with sub '$Sub' and $MaxTask tasks"); &Debug("Starting $MaxTask $Object thread") if ( $debug and $MaxTask ); # Start threads if not enough threads are started if ( &THList($Sub) < $MaxTask ) { while ( &THList($Sub) < $MaxTask ) { &Debug("Creating new $Object thread object"); my $Child = new $Object ; if ( &isa( $Child ,$Object )) { &Debug("$Child created"); } else { &Error("Can't create $Object thread object"); last ; } if ( $Child->init() ) { my $Tid = $Child->Fork(); if (defined($Tid)) { &Debug("$Sub thread started with TID $Tid"); $sons{$Tid} = $Child ; $sonname{$Tid} = $Child->getName ; $HasQuit{$Tid} = 0 ; } else { &Error("Can't start $Sub thread"); last ; } } else { &Error("Can't initialize $Sub thread"); last ; } } # Or ask threads to check if one can quit } elsif ( &THList($Sub) > $MaxTask ) { my @Childs = &THList($Sub) ; my $count = scalar(@Childs) - $MaxTask ; while ( $count -- ) { my $child = shift @Childs ; &Debug("Asking T$child $Sub thread to stop"); $sons{$child}->AskToStop(); &SetNewLogger(0) if ( $sons{$child}->getKind() =~ /logger/i ); $sonname{$child} .= '*' ; $HasQuit{$child} = $sonname{$child} ; } # Or we are checking a thread kind as it can be dead } elsif ($MaxTask) { foreach ( &THList($Sub) ) { $sons{$_}->Delete() unless $sons{$_}->isAlive() ; } &Debug("Still have the right number ($MaxTask) of $Sub thread") if ( &THList($Sub) == $MaxTask ); } my @list = &THAlive($Sub) ; &Debug( scalar(@list) . " $Object thread started") if ( $debug and $MaxTask ); return @list ; } sub ThreadsChecker { my $Err = 0 ; # Update SpoolManager need if necessary if ( $SonKind{SpoolManager} != $SCAN_SPOOL ) { &Debug(($SCAN_SPOOL ? "Activating" : "Suspending") . " SpoolManager thread"); $SonKind{SpoolManager} = $SCAN_SPOOL ; } foreach my $kind ( @_ ? @_ : keys(%SonKind) ) { my $need = $MAXTASK * $SonKind{$kind} ; my $todo = $need - &THAlive($kind) ; if ( $todo ) { my $nextmessage ; if ( $todo < 0 ) { &Debug("Shutting down " . abs($todo) . " $kind threads to reach $need"); $nextmessage = "Failed to stop threads" ; } else { &Alert("Restarting $todo $kind threads to reach ".($need>1?"$need ones":"one")); $nextmessage = "Failed to start threads" ; $controlrestart ++ ; if (&tv_interval($restartcheck) >= 2 ) { if ( $controlrestart > 5 ) { &Alert("Stopping service as too much process restart ($controlrestart>5) occurs in the last 2 sec"); $MUSTQUIT ++ ; } $controlrestart = 0 ; $restartcheck = [ &gettimeofday() ] ; } } # Don't try again to start a process too quickly if (defined($LastRestart{$kind})) { unless ( &tv_interval($LastRestart{$kind}) >= 5 ) { &Debug("Will delay the start of $kind thread"); next ; } &Debug("Previous $kind thread was started " . &tv_interval($LastRestart{$kind}) . " sec ago"); } $LastRestart{$kind} = [ &gettimeofday() ]; my $count = &ThreadsStarter( $kind ); # Try again one time as first call could have only clean the hash $count = &ThreadsStarter( $kind ) unless ( $count == $need ); unless ( $count == $need ) { &Warn($nextmessage) ; &Warn("Only " . &THAlive($kind) . " threads are availables"); $Err ++ ; } } } return $Err ? 0 : 1 ; } my ( $firstcheck , $lastcheck ) = ( [ &gettimeofday() ] , [ ] ); sub CheckChildrenQuitTimeOut { if ( @{$lastcheck} or &tv_interval($firstcheck) >= 1 ) { if ( @{$lastcheck} and &tv_interval($lastcheck) >= 1 ) { &Debug((&THCountAll + &THCountStopping) . " childrens seems to be alive since " . &tv_interval($firstcheck)); $lastcheck = [ &gettimeofday() ]; } elsif ( ! @{$lastcheck} ) { $firstcheck = [ &gettimeofday() ]; $lastcheck = [ &gettimeofday() ]; } } map { if ( ! $HasQuit{$_} and $sons{$_}->isAlive() ) { if ($sons{$_}->SendSigTerm()) { &Debug("Sending SIGTERM to children T$_"); } elsif ($sons{$_}->SendSigKill()) { &Debug("Sending SIGKILL to children T$_"); $sons{$_}->Delete() ; $sonname{$_} .= '+' ; $HasQuit{$_} = $sonname{$_} ; } } else { &Debug("Children T$_ has leaved now"); push @answers , $sons{$_}->getAnswers() ; $sons{$_}->Delete() ; $sonname{$_} .= '+' ; $HasQuit{$_} = $sonname{$_} ; } } keys(%sons) ; } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;