#!/usr/bin/perl # # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id$ # BEGIN { use strict ; use warnings ; use POSIX 'setsid' ; use A2P::Globals ; our $VERSION = sprintf "%s", q$Rev: 762 $ =~ /(\d[0-9.]+)\s+/ ; if (defined($ENV{JAVA_HOME}) and $ENV{JAVA_HOME}) { # Don't fork if launch by Eclipse, script launcher unset JAVA_HOME $Progname = "a2p-eclipse" ; $MUSTQUIT = 1 ; } else { # Daemonize early my $forkpid = 0 ; our $argument = $ARGV[0] || "" ; chdir '/' or die "Can't chdir to /: $!"; open *STDIN , '<', '/dev/null' or die "Can't read from /dev/null: $!"; open *STDOUT, '>', $StdoutFile or die "Can't write to $StdoutFile: $!"; defined($forkpid = fork) or die "Can't fork: $!"; # Save son PID in system file provided as argument if file exists if ( $forkpid > 0 ) { if ( length($argument) > 0 ) { open *PIDFILE , '>', $argument or die "Can't open PID file '$argument': $!"; print PIDFILE $forkpid ; close(PIDFILE); } # and leave our son alone exit(0); } my $priority = getpriority( 0, $$ ); if ( $priority < 10 ) { # Increase our priority setpriority 0, 0, $priority + 5 or die "Can't update priority: $!" ; } } } our $VERSION ; our $argument ; our $Progname ; our $MUSTQUIT ; #-------------------------------------------------------------------------- # Here we import any needed modules only after we are in background #-------------------------------------------------------------------------- use strict ; use warnings ; use integer ; use POSIX qw( :signal_h :sys_wait_h setsid ); use Time::HiRes qw( usleep gettimeofday tv_interval ); use A2P::Init ; use A2P::Globals ; use A2P::Globals qw( UpdateSharedEnv ) ; use A2P::Syslog ; use A2P::Com qw( GetCom IsCom comCOM comREQ comLOG comJOB comDONE ); use A2P::Signal ; use A2P::threads ; use A2P::threads qw( SetNewLogger ThreadsStarter ThreadsChecker THCountAll is_son THCountStopping FromToDoCom CheckChildrenQuitTimeOut is_really_logger threads_internal_updates THList threadSleep reduceSleep ) ; use A2P::Thread ; use A2P::Logger ; use A2P::BackEnd ; use A2P::Listener ; use A2P::Archiver ; use A2P::EService ; use A2P::Converter ; use A2P::JobManager ; use A2P::SpoolManager ; use A2P::Signal 'LogSigMessage' ; &Debug(" Module load error: $! $? $@") if ( $! or $? or $@ ); &Debug(" =============================START=============================="); &setsid or die "Can't set a new session: $!"; open *STDERR, '>&', *STDOUT or die "Can't redirect STDERR to STDOUT: $!"; #------------------------------------- # Here we are in a daemonized process #------------------------------------- &Debug("Current progname = $0, version $VERSION, started"); &Debug("Setting service name to $Progname"); # Update system name of this daemon to be pretty with ps command $0 = $Progname ; # Reset error code $! = 0 ; # Declarations in threads.pm # Empty Sons/Threads lists and hashs, hash keys are processes PID %sons = () ; %HasQuit = () ; @answers = () ; my %requests = () ; ############################################################################# sub InitCheck { # AFPSPOOL must be correctly defined and checked at any update unless ( -d $AFPSPOOL ) { &Alert("abterm: AFPSPOOL '$AFPSPOOL' folder doesn't exist"); return ++$MUSTQUIT ; } # Check needed folders my %Folders = ( LOCKID_FOLDER => $AFPSPOOL . '/.a2p', SERVICE_TMP => $SERVICE_TMP, SHMDIR => $SHMDIR, ERRORSPOOL => $ERRORSPOOL, DONESPOOL => $DONESPOOL, STATS_FOLDER => $STATS_FOLDER ); while ( my ($var, $Path) = each(%Folders) ) { unless ( -d $Path ) { &Info("Creating $Path folder as $var"); my $Folder = "" ; foreach my $tmp_path ( split(m{[/]+}, $Path) ) { $Folder .= "/$tmp_path" ; mkdir $Folder , oct(775) unless ( -d $Folder ); &Error("Can't create '$Path' folder as $var: $!"), last unless ( -d $Folder ); } } } # Initialize SERVICE_TMP folder if ( -d $SERVICE_TMP ) { unless ( system("touch $SERVICE_TMP/init_$Progname") == 0 ) { &Error("Can't initialize $SERVICE_TMP folder: $!"); $MUSTQUIT = 1 ; } } else { &Error("Can't initialize unavailable $SERVICE_TMP folder: $!"); $MUSTQUIT = 1 ; } # MAXTASK must be greater or equal to 1 if ( $MAXTASK < 1 ) { &Info("Setting MAXTASK to 1 as it is lower: ".$MAXTASK); $MAXTASK = 1 ; } if ( $MAX_BUFFER_SIZE < 8192 ) { &Info("Setting MAX_BUFFER_SIZE to 8192 as it is lower: " . $MAX_BUFFER_SIZE ); $MAX_BUFFER_SIZE = 8192 ; } # Test opening logfile in case of file logging if ( $LOGFILENAME and ( $LOGFILE_VS_SYSLOG or $DEBUG_IN_FILE )) { $! = 0 ; unless (open(*LOG, '>', $LOGFILENAME)) { $LOGFILE_VS_SYSLOG = 0 ; $DEBUG_IN_FILE = 0 ; return &Error("Can't open '$LOGFILENAME': $!"); } if ( $LOGFILE_PURGE ) { &UPSTAT('PURGE-LOG'); truncate LOG , 0 ; $LOGFILE_PURGE = 0 ; } close(LOG); } # Reset Debug comportement &ResetDebug() ; &threads_internal_updates(); } ################################################################################ &InitCheck ; $STATS{'A2P-VERSION'} = A2P_RPM_VERSION ; $maintid = $$ ; map { if ( ! $MUSTQUIT and &ThreadsStarter( $_ , 1 ) != $SonKind{$_} * $MAXTASK ) { &Error("Fatal error can start a thread"); $MUSTQUIT ++ ; } } keys(%SonKind) ; &Debug(&THCountAll . " modules started"); &SetNewLogger(-1) if (&THCountAll); # First logger initialization my ( $Get, @Temp, $MaxAnswers, $busyby, @RollChilds, @KeptAnswers ); my %LastGetRequests ; my @MarkTimer = &gettimeofday() ; $started = 0 ; my %logtimeout = () ; my $lastcheck = [ &gettimeofday() ]; my $SleepFactor = 1 ; # Empty lists to rotate getanswers other childs and to keep answers @KeptAnswers = () ; @RollChilds = () ; $MaxAnswers = $COM_BURST ? 100 * $COM_BURST : 100 ; # Hack to handle some rare cut answer my @bad_answer = () ; # Threads handler will loop only if threads are availables while ( keys(%sons) and ! $MUSTQUIT ) { &TIMESTAT('MainLoop'); &LogSigMessage(); A2P::Thread::self_TEST() if $do_test ; @MarkTimer = &gettimeofday() , &Info("---MARK---") unless ( &tv_interval(\@MarkTimer) < 1800 ); ####################### # Check communication ####################### # First get any new Com from childs # 1. Update our child list if empty @RollChilds = keys(%sons) unless @RollChilds ; # 2. Update answers list if not full and when we have child to getanswers while ( @RollChilds ) { my $OneSon = shift @RollChilds ; next unless &is_son($OneSon); next if ( @answers > $MaxAnswers and defined($LastGetRequests{$OneSon}) and &tv_interval($LastGetRequests{$OneSon}) < 2 ); $LastGetRequests{$OneSon} = [ &gettimeofday() ]; my @SonAnswers = $sons{$OneSon}->getAnswers() ; push @answers , @SonAnswers ; &UPSTAT('GETANSWERS', scalar(@SonAnswers)); } # 3. Only sleep when have nothing to tell or to few new request has arrived &UPSTAT('LOOPS'); @answers ? &reduceSleep() : &threadSleep() ; # 4. Reintegrate last kept answers to the end only if we have not # something more urgent to do # By default we will read current answers if (@KeptAnswers) { $LastGetRequests{'KeptAnswers'} = [ &gettimeofday() ] unless (defined($LastGetRequests{'KeptAnswers'})); if ( ! @answers or &tv_interval($LastGetRequests{'KeptAnswers'})>= 1 ) { push @answers, @KeptAnswers ; @KeptAnswers = () ; $LastGetRequests{'KeptAnswers'} = [ &gettimeofday() ]; } } &MAXSTAT('answers-KEPT',scalar(@KeptAnswers)); &MAXSTAT('answers-COUNT',scalar(@answers)); # 5. Then check COM_BURST answers max my $max = $COM_BURST ; while ( $max -- and defined( $Get = shift @answers )) { chomp($Get); my ( $Id , $Req ) = &IsCom( comCOM , $Get ); # 6. Empty COM should not exist but just in case unless ( $Get ) { &debugdev("Skipping unexpected empty COM '$Get'"); next ; } # 7. Check $Get is really a COM unless ( defined($Id) and defined( $Req ) ) { if ($Get =~ /^[<]/) { my $ref ; &UPSTAT('COM-CHECK-FRAGMENT'); # Control this com is still not handled if (($ref) = grep { $_->[0] eq $Get } @bad_answer ) { # Don't keep too much time next if (++ $ref->[1] < 100); } else { # Will try to reassemble a fragmented com push @bad_answer, [ $Get, 0 ] ; next ; } } elsif (@bad_answer) { # Try to assemble and reinject immediatly my $try = shift @bad_answer ; unshift @answers, $try->[0] . $Get ; &Info("Trying reassembled com: " . $try->[0] . $Get); &UPSTAT('COM-REASSEMBLED'); next ; } &Error("[DEV] Can't understand answer '$Get'"); &UPSTAT('COM-BAD-FRAGMENT'); next ; } # 8. Check if we just got a number as COM shortcut if ( $Req =~ /^\d+$/ ) { # Got a scalar &UPSTAT('GETNUMCOM'); if ( $Req == QUIT ) { &UPSTAT('GETQUITCOM'); # Should now delete this thread from list if (&is_son($Id)) { &Debug("Received QUIT from " . $sonname{$Id} ); $HasQuit{$Id} = $sonname{$Id} ; # A thread has quit, so we must restart it unless # we are quitting or we have ask it to stop if ($sons{$Id}->StoppingSince()) { $sons{$Id}->Delete(); } else { my $kind = $sons{$Id}->getKind ; $sons{$Id}->Delete(); unless ( $do_quit or $MUSTQUIT ) { &ThreadsChecker( $kind ); &Info("Module $kind restarted"); } } } else { # Check threads later in we need to restart some child $do_check ++ ; &Debug("Received QUIT from " . $sonname{$Id} . ", should check what's happened"); } } elsif (&is_son($Id)) { if ( $Req == PING ) { &UPSTAT('GETPINGCOM'); &Debug("Received PING from " . $sonname{$Id} ); $sons{$Id}->GotPing(); } elsif ( $Req == _INIT ) { &UPSTAT('GETINITCOM'); &Debug("Received INIT from " . $sonname{$Id} ); $sons{$Id}->GotInit(); } elsif ( $Req == comDONE ) { &UPSTAT('GETCOMDONE'); &Debug("Received comDONE from " . $sonname{$Id} ) if $ADVANCED_DEBUGGING ; $sons{$Id}->ResetRemoteBufSize(); } else { &UPSTAT('GETBADCOM'); &Error("[DEV] Don't understand com '$Get' as Req=" . ( defined($means{$Req}) ? $means{$Req} : $Req ) . " from " . $sonname{$Id} ); } } elsif (defined($HasQuit{$Id})) { if ($HasQuit{$Id}) { &UPSTAT('GETDEADCOM'); # Son still has QUIT and is deleted &Debug("Received Req=" . ( defined($means{$Req}) ? $means{$Req} : $Req ) . " from dead thread T$Id" ); } else { &UPSTAT('GETBADCOM'); &Error("[DEV] Received Req=" . ( defined($means{$Req}) ? $means{$Req} : $Req ) . " for thread T$Id" ); } } else { &UPSTAT('GETBADCOM'); &Error("[DEV] Unsupported com '$Get' as Req=" . ( defined($means{$Req}) ? $means{$Req} : $Req ) . " from thread T$Id" ); } # Numeric COM analysed so looping on next answer next ; } # 9. Manage log message: # 9.1. Enqueue log message if still have some # 9.2. Transmit to logger if running # 9.3. Do our self the log when logger is not available if ( @Temp = &IsCom( comLOG , $Req )) { &UPSTAT('GETLOGCOM'); unless ( @Temp == 2 and $Temp[1] ) { &UPSTAT('BAD-LOGCOM'); &Debug("Found bad log request '$Req'"); next ; } if (&is_really_logger($loggertid)) { # 9.2. Just transmit log to logger $sons{$loggertid}->AskThread( TODO , $Req ); } else { # 9.3. Finally try to log directly if Logger not available &SetNewLogger(0) if $loggertid ; &DirectLog( $Temp[0] , [ $Temp[1] ] , "MAIN:" ); } next ; } # 10. Manage requests from ourself, JobManager and Listener if ( @Temp = &IsCom( comREQ , $Req )) { &UPSTAT('GETREQCOM'); unless ( @Temp == 2 and $Temp[1] ) { &UPSTAT('GETBADREQCOM'); &Debug("Found bad REQ request '$Req'"); } if (&is_son($Id) or $Id == $$) { &Debug((($Id == $$)? "I am" : $sonname{$Id} . " is" ) . " requesting $Temp[1] to $Temp[0]"); # if @threads list is empty we don't find available thread unless ( &FromToDoCom( $Id , @Temp ) ) { &UPSTAT('BUSY-MODULE'); &Debug("No $Temp[0] thread available, keep request"); push @KeptAnswers , $Get ; } } elsif (defined($HasQuit{$Id})) { if ($HasQuit{$Id}) { &UPSTAT('GETDEADREQCOM'); # Son still has QUIT and is deleted, still try to send it if # it is for JobManager if ($Temp[0] =~ /^JobManager$/) { &Debug("Transmitting request $Temp[1] from dead " . $sonname{$Id} . " to $Temp[0] thread as myself"); unshift @answers , &GetCom( comCOM, $$ => $Req ); } else { &Debug("Won't transmit request $Temp[1] from dead " . $sonname{$Id} . " to $Temp[0] thread"); } } else { &UPSTAT('GETBADREQCOM'); &Error("Can't transmit request $Temp[1] from " . $sonname{$Id} . " to $Temp[0] thread"); } } else { &UPSTAT('GETBADREQCOM'); &Error("[DEV] Unsupported req $Temp[1] from " . $sonname{$Id} . " to $Temp[0]"); } next ; } # 11. Com should come from still alive thread if ( ! &is_son($Id) ) { &Info("Forgetting com $Req as " . $sonname{$Id} . " is dead"); # 12. Manage answers from busy threads BackEnd, SpoolManager, Converter # and Archivage } elsif ( $busyby = $sons{$Id}->isBusyBy ) { # 12.a This must be an answer for another thread @Temp = &IsCom( comJOB , $Req ); # 12.b Handle JOB comDONE if ( @Temp and &IsCom( comDONE , $Temp[1] )) { # Cancel business only when we receive a comDONE &Debug("JobDone received from " . $sonname{$Id} ); $sons{$Id}->setReady(); # Thread is now ready to receive another request # 12.c Check business is not lost } elsif (defined($HasQuit{$busyby})) { &UPSTAT('DEAD-REQUESTER'); if ($HasQuit{$busyby}) { # Son still has QUIT and is deleted &Debug("Delete answer $Req from T$Id to dead $busyby"); } else { &Error("Delete request $Req from T$Id to ghost $busyby"); } } elsif ( $busyby == $$ ) { # This is an answer for parent thread if ( $Req =~ /^started$/i and &is_son($Id)) { if ($sons{$Id}->is('JobManager')) { &Info("$Progname service v".A2P_RPM_VERSION." started"); &FromToDoCom( $$ , Listener => 'STARTED' ) or &Error("Can't tell Listener we are started"); } elsif ($sons{$Id}->is('Listener')) { &Debug("Listener is informed we have started"); } else { &debugdev("Bad STARTED answer from " . $sonname{$Id}); } $sons{$Id}->setReady ; } else { &debugdev("Found '$Req' answer from " . $sonname{$Id}); } } else { &debugdev("Bad answer $Req from T$Id and for T$requests{$Id}"); } next ; } # We should never come there &UPSTAT('GETBADCOM'); &debugdev("'$Req' from " . $sonname{$Id} . " no more supported"); &debugdev("Next com to get is '$answers[0]'") if (defined($answers[0])); &debugdev("Then com will be '$answers[1]'") if (defined($answers[1])); } # Loop on COMBURST answers ####################### # Check flags ####################### # Quit only when no job is being converted, and we're not still quitting # Any Job in error will be lost if ($MAXTASK and $do_quit ) { if ( $do_quit == 1 ) { &Info("Waiting until threads have quit"); # Divide USLEEP by 2 to accelerate communications $USLEEP >>= 1 ; # Firstly, ask JobManager to not do any more jobs by # setting its MAXJOBS env to 0, it should quit when lasts are done map { $sons{$_}->AskInit('$ENV{MAXJOBS} = 0'); $sons{$_}->AskInit('CAN_UPDATE_NOW'); } &THList('JobManager') ; # Secondly, ask threads to update USLEEP map { $sons{$_}->AskInit('$ENV{USLEEP} = ' . $USLEEP); } keys(%sons) ; $do_quit ++ ; } elsif ( $do_quit == 2 ) { # Secondly, just check if JobManager has quit before switching # To normal quit $do_quit ++ unless &THList('JobManager'); } else { # Normal quit by asking all processes to quit &Debug("Setting MAXTASK to 0"); $MAXTASK = 0 ; &Debug("Asking threads to stop"); map { $sons{$_}->AskToStop() or &Info("Can't asking " . $sonname{$_} . " to quit") } keys(%sons) ; &SetNewLogger(0); # We can safely activate debugging when quitting is requested $NO_SYSLOG_DEBUG = 0 if $SERVICE_DEBUG ; } } elsif ( $do_quit > 4 ) { $MUSTQUIT ++ ; } elsif ( $do_quit ) { &CheckChildrenQuitTimeOut() ; } if ($Childquit) { my $pid = 0 ; &Debug("$Childquit SIGCHLD received"); while ($Childquit > 0 and ! $MUSTQUIT and $pid = waitpid(-1,&WNOHANG)) { &Debug("SIGCHLD with pid = $pid"); last if ( $pid < 0 ); if (defined($sons{$pid})) { $Childquit -- ; my $kind = $sons{$pid}->getKind ; my $isLogger = $sons{$pid}->is('Logger'); # Get answers just in case we missed last QUIT push @answers , $sons{$pid}->getAnswers ; &Debug($kind."[".$sonname{$pid}."] returns with code ".($?>>8)); &SetNewLogger(0) if $isLogger ; # We should better push the pid in a list to check it elsewhere $HasQuit{$pid} = $sonname{$pid} ; # Check if we need to restart the thread if ( ! $do_quit and ! $MUSTQUIT ) { if (&ThreadsChecker( $kind )) { &Info("Module $kind thread restarted"); &SetNewLogger(-1) if $isLogger ; } else { &Alert("Aborting as I can't restart $kind thread"); $do_quit ++ ; } } } &Debug("T$pid has terminated"); } $do_check ++ unless ( $Childquit > 0 or $do_quit or $MUSTQUIT ); $Childquit = 0 ; } if ($MAXTASK and $do_init) { # >0 after HUP signal has been received $do_init = 0 ; # Reload configuration my @Updates = &Init() ; if (@Updates) { # First update ourself map { eval "$_" } @Updates ; &UpdateSharedEnv ; &InitCheck ; # And transmit to children any modification map { my $tid = $_ ; map { $sons{$tid}->AskInit($_) } @Updates, 'CAN_UPDATE_NOW' ; } keys(%sons) ; # Here we can have to load some threads or # unload some so just check threads $do_check ++ ; } # Now we can start has threads are initialized if ( ! $started ++ ) { unless (&FromToDoCom( $$ , JobManager => 'START' )) { &Error("Can't ask JobManager to start, aborting"); $do_quit ++ ; } } $MaxAnswers = $COM_BURST ? 1000 * $COM_BURST : 1000 ; } $do_check ++ if ( &tv_interval($lastcheck) - 5 > 0 ); if ($MAXTASK and $do_check and ! $do_quit and ! $MUSTQUIT) { $do_check = 0 ; $lastcheck = [ &gettimeofday() ]; # Call checker only for threads not still in check mode but also # force check when quitting as this is the way to advertize threads map { if ( $sons{$_}->isToCheck ) { $do_ping ++ ; &Debug("Checking " . $sonname{$_} . "..."); $sons{$_}->Check() if $sons{$_}->isAlive(); } } keys(%sons) ; &Debug("No thread to check") unless $do_ping ; # Also check threads numbers &ThreadsChecker(); } if ($MAXTASK and $do_ping) { map { if ( $sons{$_}->Ping() ) { &Debug("Ping " . $sonname{$_} . " thread"); $do_ping -- ; } } keys(%sons); } } # Be sure to keep debugging information when stopping $NO_SYSLOG_DEBUG = 0 ; &LogSigMessage ; # Do some check while quitting &Debug(&THCountStopping . " modules stopping") if &THCountStopping; if ( &THCountAll ) { &Debug(&THCountAll . " modules running, asking them to stop"); map { $sons{$_}->AskToStop() or &Info("Can't asking " . $sonname{$_} . " to quit") } keys(%sons) ; } &Info("$Progname service is stopping"); # Wait until every child has quit while ( &THCountAll or &THCountStopping ) { &CheckChildrenQuitTimeOut() unless $Childquit ; @MarkTimer = &gettimeofday() , &Debug("---QUITTING---") unless ( &tv_interval(\@MarkTimer) < 1 ); &Debug("$Childquit SIGCHLD received") if $Childquit ; while ( $Childquit > 0 ) { $Childquit -- ; my $pid ; while ( $pid = waitpid(-1,&WNOHANG) ) { last unless ( $pid > 0 and defined($sonname{$pid})); delete $sons{$pid}; &Debug("$sonname{$pid} has terminated"); $HasQuit{$pid} = $sonname{$pid}; } } $Childquit = 0 if ( $Childquit < 0 ); usleep $USLEEP ; } while ( my $pid = wait ) { last if ( $pid < 0 ); delete $sons{$pid} if (defined($sons{$pid})); &Debug("T$pid has terminated"); } unlink $LISTENER if ( -S $LISTENER ); if (defined($argument)) { &Debug("PID file = '$argument'"); # Delete our PID in PID file open *RET , '>', $argument or die "Can't open PID file '$argument': $!" ; print RET "0" ; close(RET); } &LogSigMessage ; &Info("$Progname service stopped"); exit(0); END { # Output statistics when quitting from a disgnostic service A2P::Thread::self_TEST() if ($Progname =~ /^diagnostics/); $$ == $maintid ? &Debug("=============================QUIT===============================") : &Debug($0 . "[$$] =====QUIT====="); }