# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: Thread.pm 3 2007-10-18 16:20:19Z guillaume $ # package A2P::Thread; use strict ; use Socket ; use Errno qw(:POSIX); use Fcntl ':flock' ; use IO::Socket ; use Time::HiRes qw( usleep gettimeofday tv_interval ) ; use POSIX qw(:signal_h :sys_wait_h setsid) ; use A2P::Globals ; use A2P::Syslog ; use A2P::threads ; use A2P::threads qw( threadSleep reduceSleep ) ; use A2P::Signal ; use A2P::Signal qw( LogSigMessage ) ; use A2P::Globals qw( UpdateSharedEnv ) ; use A2P::Com qw( comREQ comCOM comJOB comUPD comDONE comINF comXML comTEST IsCom GetCom TakeComLocking ReleaseComLocking GetTmpFile ); use A2P::JobStatus 'a2pjobstate' ; BEGIN { our $VERSION = sprintf "%s", q$Rev: 1399 $ =~ /(\d[0-9.]+)\s+/ ; } our $VERSION ; # Hash used for communications my %ReadBuffer = () ; my %lockfh = () ; my %SocketProc = () ; my %BUFLEN = () ; my $THIS ; sub new { my $class = shift ; &Debug("new $class v$VERSION"); $class =~ /^A2P::(.*)$/ ; my $self = { KIND => $1, NAME => $Progname . '-' . $1 , SIGTERM => 0, SIGKILL => 0, TOPING => 0, LOGGER => 0, BUSY => [ 0, "" ], DO_QUIT => 0, DO_STAT => [ &gettimeofday() ], CanPING => [ &gettimeofday() ], HAVE_STATS => 0, STATS => [], DONTSLEEP => 0, URGENTCOM => 0, COMBUFFER => [], JOB => "" }; return bless $self , $class ; } sub init { my $self = shift ; &Debug("new A2P::Thread is " . $self->getName); return $self->ThreadError("1, '$self->{KIND}' is not a valid thread kind") unless defined($SonKind{$self->{KIND}}) ; socketpair( $self->{SON}, $self->{DADDY}, AF_UNIX, SOCK_STREAM, PF_UNSPEC ) or return $self->ThreadError("2, Can't open socket pair for " . $self->{KIND} . " thread: $!"); bless $self->{SON} , "IO::Socket" ; bless $self->{DADDY} , "IO::Socket" ; &Debug($self->getName . " object is ready"); 1 ; } our $AbortMessages ; sub defautABTERMError { return () ; } sub ABTERM { my $self = shift ; my ( $package, $filename, $line ) = caller ; my $Err = $_[0] =~ /^\d+$/ ? shift : $line ; @AbortTime = &gettimeofday(); my @ERR = $self->defautABTERMError() ; push @ERR, (defined($AbortMessages) and defined($AbortMessages->{$Err}))? ( $Err < 100 ? $AbortMessages->{$Err} : sprintf( $AbortMessages->{$Err} , @_ ) ) : @_ ; &Error( @ERR ); # Return information to JobManager is JOB defined and not JobManager map { $self->Return( $self->{JOB} => &GetCom( comINF, 'ABTERM', $_ )) } @ERR if (defined($self->{JOB}) and ! $self->is('JobManager')); map { &a2pjobstate( $self->{JOB}, $self->{STEP}, 'A', { ABTERM => $_ } ) } @ERR if (defined($self->{JOB}) and defined($self->{STEP})); &UPSTAT('JOB-ABTERM'); 0 ; } sub getBusyReq { # Cosmetic function to return debugging info for later debug my $self = shift ; my @business = @{$self->{BUSY}}; shift @business ; return "@business" ; } sub setBusy { my $self = shift ; @{$self->{BUSY}} = @_ ; &UPSTAT('SET-BUSY-SON') if ( $_[0] ); } sub isBusyBy { my $self = shift ; # {BUSY} is initialized to [ 0 , "" ] at object creation if ( @_ ) { return $self->{BUSY}->[0] == $_[0] ; } else { return $self->{BUSY}->[0]; } } sub setReady { &UPSTAT('SET-READY-SON'); $_[0]->setBusy( 0, "" ); } sub self_TEST { my $self = shift ; $do_test = 0 ; our @TT ; $STATS{'RUNNING'} = sprintf("%.01fs", &tv_interval(\@TT) - $^T); # Output current memorized dynamic statistics my @stats = grep { $STATS{$_} } sort(keys(%STATS)) ; my $tag = $maintid ? "-$maintid" : "" ; if (@stats) { open LOG, ">>", "/tmp/$Progname$tag-selftest.log" or return &Error("Can't open log file in /tmp for self-test: $!"); # Don't check anything as this is not critical flock(LOG, LOCK_EX); foreach my $stat ( map { sprintf("%50s %s", $_, $STATS{$_}) } @stats ) { print LOG $stat, "\n" ; } flock(LOG, LOCK_UN); close(LOG); } # Output current memorized log lines my @logs = &getBackLogs(); if ( @logs ) { open LOG, ">>", "/tmp/$Progname$tag-selfdebug.log" or return &Error("Can't open log file in /tmp for self-debug: $!"); flock(LOG, LOCK_EX); foreach my $debug ( @logs ) { print LOG $debug, "\n" ; } flock(LOG, LOCK_UN); close(LOG); } # Output other things foreach my $Array qw( REQUESTS_ARRAY ) { if (defined($self->{$Array})) { my @Array = @{$self->{$Array}} ; if (@Array) { my $ii = 0 ; my $li = "[%0" . scalar(@{$self->{$Array}}) . "d]: " ; open LOG, ">>", "/tmp/$Progname$tag-self$Array.log" or return &Error("Can't open log file in /tmp for " . $Array . " array: $!"); flock(LOG, LOCK_EX); foreach my $val ( @Array ) { $val = ref($val) =~ /^ARRAY/i ? join('',@{$val}) : $val ; print LOG $Array, sprintf($li,$ii++), $val, "\n" ; } flock(LOG, LOCK_UN); close(LOG); } } } } sub Delete { my $self = shift ; &Debug("Deleting T$self->{TID} thread from sons list") unless (defined($self->{TOCLEAN})); $sons{$self->{TID}} = 0 ; delete $sons{$self->{TID}} ; undef $self } sub ThisError { my ( $package, $filename, $line ) = caller ; $filename =~ m|([^/]+)$| ; &Error($_[0]->getName . ": from $1, L.$line: " . $_[1]); 0 ; } sub commonloopdebug { my $self = shift ; &Debug("Current BUFLEN:", map { "$_ -> $BUFLEN{$_}" } keys(%BUFLEN)); } sub loopdebug ; *loopdebug = \&commonloopdebug ; sub ThreadError { $_[0]->{ERROR} = $_[1] ; 0 ; } sub setToClean { $_[0]->{TOCLEAN} = 1 ; } sub SwitchLogger { # Only called from parent return 0 if ( $_[0]->{LOGGER} == $_[1] or ! $_[0]->{TID} ); &Debug("Switching logger to T$_[1] for " . $_[0]->getName); $_[0]->{LOGGER} = $_[1] ; $_[0]->AskThread( _INIT , "$_[1]" ); } sub getName { return $_[0]->{NAME} ; } sub getKind { return $_[0]->{KIND} ; } sub is { return grep { $_[0]->getKind() eq $_ } @_ ; } sub isForked { return defined($_[0]->{FORKED}) ? 1 : 0 ; } sub isAlive { return 0 unless ( $_[0]->{TID} ); my $test = 0 ; $test ++ if (kill 0 , $_[0]->{TID}); # TODO ContrĂ´ler errno p 371 linux PT &Debug( $_[0]->getName . ( $test ? " seems alive" : " seems dead" ) ); return $test ; } sub isToCheck { return 0 if (defined($_[0]->{QUITTING})); # A thread is to check if it has been check more than 15 seconds ago return $_[0]->LastCheck() > 15 ? $_[0]->CanPing() : 0 ; } sub KeepStats { my $self = shift ; return unless ( $self->{HAVE_STATS} and $KEEP_STATS and -d $STATS_FOLDER ); if (@_) { my @time = localtime ; push @{$self->{STATS}}, [ sprintf('%4d%02d%02d', 1900+$time[5], $time[4]+1, $time[3]), sprintf('%02d%02d%02d', $time[2], $time[1], $time[0]) , $self->myStats(\@_) ]; } # Only keep stat after at least 1 second has passed, and there's some # But keep output stats if we are quitting return unless ( @{$self->{STATS}} and ($self->{DO_QUIT} or &tv_interval($self->{DO_STAT}) > 1)); my %Locked = () ; my $File ; foreach my $Stat ( @{$self->{STATS}} ) { # Compute filename my $statfile = $self->getName() . '-' . $Stat->[0] ; unless ( $Locked{$statfile} ) { open $File, ">>$STATS_FOLDER/$statfile" or return $self->ThreadError("Can't open stats file '" . $statfile . "': $!"); # Try exclusive lock my $locked = flock($File, LOCK_EX); unless ($locked) { close($File); return $self->ThreadError("Can't lock stats file '" . $statfile . "': $!"); } if ( defined($self->{STATHEADERS}) and @{$self->{STATHEADERS}} ) { # Force again headers output after a restart, in case they # have changed print $File join(';', @{$self->{STATHEADERS}}), "\n" or return $self->ThreadError("Can't output in file '" . $statfile . "': $!"); @{$self->{STATHEADERS}} = () ; } $Locked{$statfile} = $File ; } $File = $Locked{$statfile} ; print $File join(';', @{$Stat}), "\n" or return $self->ThreadError("Can't output in statfile '" . $statfile . "': $!"); } $self->{STATS} = [ ] ; # Unlock and close stat files map { flock($Locked{$_}, LOCK_UN) and close($Locked{$_}) } keys(%Locked); # Reset stats timer $self->{DO_STAT} = [ &gettimeofday() ]; } sub LastCheck { return defined($_[0]->{CHECKED}) ? &tv_interval($_[0]->{CHECKED}) : 100 ; } sub Check { $_[0]->{TOPING} ++ ; # A thread is really busy if it has not say any thing since 60 seconds # (see isToCheck()) &Info($_[0]->getName . " thread seems busy since " . sprintf("%.2f s",$_[0]->{TOPING})) unless ( $_[0]->{TOPING} and $_[0]->{TOPING} % 4 ); $_[0]->{CHECKED} = [ &gettimeofday() ]; $_[0]->{CanPING} = [ &gettimeofday() ]; } sub Ping { $_[0]->AskThread( PING ) if ( $_[0]->{TOPING} and $_[0]->CanPing() ); } sub GotPing { $_[0]->{CHECKED} = [ &gettimeofday() ]; $_[0]->{TOPING} -- ; $_[0]->{CanPING} = [ &gettimeofday() ]; } sub CanPing { # We can repeat a ping only if last ping was done more than 5 seconds ago return 0 unless (defined($_[0]->{CanPING})); if (&tv_interval($_[0]->{CanPING}) > 5) { $_[0]->{CanPING} = [ &gettimeofday() ] ; return 1 ; } 0 ; } sub SendSigTerm { # Send SIGTERM one time after at least 5 seconds my $self = shift; my $since = $self->StoppingSince(); return 0 unless ( $since > 5 and $since < 30 and ! $self->{SIGTERM} ); &Debug("Killing " . $self->getName . " with SIGTERM"); kill 15, $self->{TID} ; return ++ $self->{SIGTERM} ; } sub SendSigKill { # Send SIGKILL one time after at least 30 seconds my $self = shift; return 0 if ( $self->{SIGKILL} or $self->StoppingSince() < 30 ); &Debug("Killing " . $self->getName . " with SIGKILL"); kill 9, $self->{TID} ; return ++ $self->{SIGKILL} ; } ################################################################################ ## Fork code ## ################################################################################ sub Fork { my $self = shift; # Security: Only one fork call can be done by created Thread object return undef if ($self->isForked()); # Classical way to fork $self->{TID} = fork ; # Return to parent early if not forked return undef unless (defined($self->{TID})); $self->{FORKED} = [ &gettimeofday() ]; # Clean SYSLOG socket if (defined($SYSLOG)) { shutdown($SYSLOG,2); $SYSLOG = undef ; &Debug("SYSLOG socket is reset"); } # Return to parent if ( $self->{TID} ) { #============ In parent ============= &Debug("Parent-to-child link is " . $self->{SON} ); push @PIPE , $self->{SON} ; # Set the filename for COM locking $SocketProc{$self->{SON}} = "$maintid-" . $self->getKind() if $COM_LOCKING ; # Can define logger socket for parent &SetLogger( $self ) if ($self->is('Logger')); # Update object name as it is used in logging messages $self->{NAME} .= '[' . $self->{TID} . ']' ; # Return to caller in parent return $self->{TID} ; } #=== In child, we won't leave this object member === # Clean error status $? = 0 ; $! = 0 ; # Clean open COM locking sockets and communications hashes if ( $COM_LOCKING ) { map { close($lockfh{$_}) } keys(%lockfh); %SocketProc = () ; %lockfh = () ; } %ReadBuffer = () ; %BUFLEN = () ; # Clear stats map { $STATS{$_} = 0 } keys(%STATS) ; # Update immedialty signal handling, # Keep in mind these signal handlers won't be called as object member $SIG{'USR1'} = \&mySigUSR1 ; $SIG{'ALRM'} = \&mySigALRM ; $SIG{'TERM'} = \&mySigTERM ; $SIG{'QUIT'} = \&mySigTERM ; $SIG{'HUP'} = \&mySigHUP ; $SIG{'ABRT'} = \&mySigTERM ; # Update child process name $0 = $self->getName ; # Update process name with 'ps' command $Progname = $self->getName ; # For logging # Reduce priority as parent must be the one with the greater priority my $priority = getpriority( 0, $$ ); if ( $priority < 15 ) { # Increase our priority setpriority 0, 0, $priority + 5 or &Warn("Can't update priority: $!"); } $self->ThreadInitEarly(); &Debug("Child " . $self->getName . " is starting..."); &Debug("Child-to-parent link is " . $self->{DADDY} ); push @PIPE , $self->{DADDY} ; # Cleaning this process from previously created objects map { if ( $_ != $$ ) { &Debug("Cleaning unused T$_ cloned object during fork"); $sons{$_}->setToClean() ; # Just to avoid checking from DESTROY sub $sons{$_}->Delete(); } } keys(%sons); &Debug("Memory cleaned"); &Debug("Starting " . $self->getName . " T$$"); $self->ThreadInit(); my $Ret = $self->ForkedLoop ; $loggertid = 0 ; # Force to log directly &LogSigMessage(); &Debug($self->getName . " ended ($MUSTQUIT;$!;$?;$@)"); &Debug("Forced to quit ($MUSTQUIT)") if $MUSTQUIT ; $self->Answer( QUIT ); $self->KeepStats(); $self->DoBeforeQuit(); exit $Ret ; } my @bad_answer = () ; my @KeepRequest = () ; sub ForkedLoop { my $self = $THIS = shift ; my $ErrCount = 0 ; my $request ; my $LoopTiming = [ &gettimeofday() ]; my $PendingStatus = [ &gettimeofday() ]; my $LoopCount = 0 ; my $SleepFactor = 1 ; $! = 0 ; while ( ! ( $MUSTQUIT or $self->{DO_QUIT} ) ) { &TIMESTAT('ForkedLoop'); &LogSigMessage(); # Check to commit any pending job status at last each minute if (&tv_interval($PendingStatus) > 60) { &a2pjobstate ; $PendingStatus = [ &gettimeofday() ]; } &UPSTAT('LOOPS'); $self->{DONTSLEEP} ? &reduceSleep('no stats') : &threadSleep() ; # Just to flush buffer $self->PrintSock($self->{DADDY}); $self->KeepStats(); # Test thread sub, can be useful while debugging $self->self_TEST() if $do_test ; my $max = $COM_BURST ; while ( $max-- and (defined( $request = $self->GetRequest($self->{DADDY})))) { last if ( $MUSTQUIT or $self->{DO_QUIT} ); chomp $request ; my @Req = &IsCom( comREQ , $request ); if ( @Req == 2 ) { if ( $Req[0] == QUIT ) { # Do I have to quit &Debug("Notified to QUIT by parent"); $self->{DO_QUIT} = 1 ; } elsif ( $Req[0] == PING ) { # Do I have to ping $self->Answer( PING ); &Debug("PING"); } elsif ( $Req[0] == _INIT ) { # ENV update $self->Answer( _INIT ); if ( $Req[1] =~ m|^\s*(\$ENV{\w+}\s*=.*)$|i ) { if ( $self->{clientrequest} ) { # SECURITY issue: # Only Listener is concerned: no external init # accepted for now $self->CloseClient( $self->{clientrequest}, 1, "INIT is not supported from other application"); } else { &Debug("Do INIT: $1"); eval $1 ; } } elsif ( $Req[1] =~ m|^CAN_UPDATE_NOW$|i ) { # Update shared vars with ENV definition &UpdateSharedEnv ; # Reset Debug comportement &ResetDebug() ; $self->InitUpdated(); } elsif ( $Req[1] =~ m|^(\d+)$|i ) { if ( $1 == $$ ) { &Debug("Approuved as logger"); } elsif ( $self->is('Logger') ) { &Debug("Revoked as logger thread"); # We should quit as we are no more used for service $self->{DO_QUIT} = 1 ; } else { &Debug("My logger is T$1") if $1 ; $loggertid = $1 ; } } else { $self->ThisError("Not able to evaluate '" . $Req[1] . "' as ENV update"); $ErrCount ++ ; } } elsif ( $Req[0] == TODO ) { &Debug("Got 'TODO' COM: '$Req[1]'") if $ADVANCED_DEBUGGING ; $ErrCount ++ unless $self->Do( \$Req[1] ); # Called to do our job to we should sleep less next time &reduceSleep(); } else { $self->ThisError("[DEV] Can't decide request " . (defined($means{$request})?$means{$request}:$request)); $ErrCount ++ ; } # TODO Test me with also $Req[0] == _UPDATE } elsif ( @Req = &IsCom( comUPD , $request ) ) { if ( $Req[1] =~ m|^(\w+)=(["'])?(.*)\2$|i ) { my ( $var , $value ) = ( $1 , $3 ); if (defined($1) and defined($3)) { if ( grep { /$var/ } @SHARED ) { my $apos = $value =~ /^\d+$/ ? '' : '"' ; eval '$' . $var . '=' . $apos . $value . $apos ; # For important thread initialization $self->InitUpdated(); &UPSTAT($var.'_DIRECT_UPDATE'); } else { $self->ThisError("Not authorized to update '" . $var . "' to '$value'"); &UPSTAT('UNAUTH_DIRECT_UPDATE'); $ErrCount ++ ; } } else { $self->ThisError("Bad direct update request '$Req[1]'"); &UPSTAT('BAD_DIRECT_UPDATE'); $ErrCount ++ ; } } else { $self->ThisError("Bad formated update request '$Req[1]'"); &UPSTAT('BADFORMAT_UPDATE'); $ErrCount ++ ; } } elsif ( @Req = &IsCom( comCOM , $request ) ) { if ( $Req[1] == comDONE ) { &UPSTAT('GETCOMDONE'); &Debug("Received comDONE from parent") if $ADVANCED_DEBUGGING ; $self->ResetRemoteBufSize($self->{DADDY}); } else { $self->ThisError("[DEV] Can't handle request '$request'"); $ErrCount ++ ; } # Handle XML request } elsif ( @Req = &IsCom( comXML , $request ) ) { &Debug("Got an XML request"); # Check we have XML API and call it if (UNIVERSAL::can( $self , 'XML' )) { $self->XML($request); } else { $self->ThisError("[DEV] Not expected to handle '" . $request . "' XML request"); $ErrCount ++ ; } } else { if ( $request =~ /^[<]/ ) { my $ref ; &UPSTAT('COM-CHECK-FRAGMENT'); # Control this com is still not handled if (($ref) = grep { $_->[0] eq $request } @bad_answer ) { # Don't keep too much time next if (++ $ref->[1] < 100); } else { # Will try to reassemble a fragmented com push @bad_answer, [ $request, 0 ] ; $max ++ ; next ; } } elsif (@bad_answer) { # Try to assemble and reinject immediatly my $try = shift @bad_answer ; my $com = $try->[0] . $request ; &Info("Trying reassembled com: $com"); push @KeepRequest, $com ; &UPSTAT('COM-REASSEMBLED'); $max ++ ; next ; } # This only critical if not in Logger if ($self->getKind() eq 'Logger') { &Warn("Critical error on " . $self->getName); } else { &Alert("Critical error on " . $self->getName); } $self->ThisError("[DEV] Can't interpret request '$request'"); $ErrCount ++ ; } }# Loop COM_BURST # Check parent is alive unless ( kill 0, $maintid ) { # TODO ContrĂ´ler errno p 371 de linux pt $loggertid = 0 ; &Warn("Quitting as parent seems dead"); $MUSTQUIT = 1 ; } { # Rating the do loop no integer ; $LoopCount += 100 ; my $int = &tv_interval($LoopTiming); if ( $int >= 1 ) { $LoopCount = 0 ; $LoopTiming = [ &gettimeofday() ]; &MAXSTAT('DOLOOP-RATE',int($LoopCount/$int)/100); } } if ( $ADVANCED_DEBUGGING ) { &Debug("Loop rate is " . $STATS{'DOLOOP-RATE'} ); $self->loopdebug(); } &TIMESTAT('ForkedLoop'); }# Loop to read queue undef $THIS ; &Debug("$ErrCount counted errors") if $ErrCount ; return $ErrCount ? 1 : 0 ; } ################################################################################ ## End Fork code ## ################################################################################ sub ThreadInit { 1 ; # Nothing to do by default } sub ThreadInitEarly { # Directly link Log answers to parent (defined in Syslog.pm) &SetLogger( $_[0] ); # This member must be overrided in Logger Object } sub InitUpdated { 1 ; # Nothing to do by default } sub GetRequest { my $self = shift ; my $Requests = shift ; # Check if we got a fragmented not safe request return shift @KeepRequest if @KeepRequest ; return ( $MUSTQUIT or $self->{DO_QUIT} )? 1 : $self->ReadSock($Requests) ; } sub Do { my $self = shift ; my $ref = shift ; &Debug("Got to do '$$ref'"); } sub DoBeforeQuit { 1 ; # Nothing to do by default } sub mySigUSR1 { push @SigDebug, "SIGUSR1 received in " . __PACKAGE__ ; $do_test ++ ; } sub mySigALRM { return unless ($SCAN_SPOOL); $do_file ++ if ( $do_file < $MAXTASK and $SCAN_SPOOL ); push @SigDebug, "SIGALRM received (do_file=$do_file) in " . __PACKAGE__ ; } sub mySigTERM { my $ref = $_[0] eq 'ABRT' ? \@SigWarn : \@SigDebug ; push @{$ref}, "SIG$_[0] received in " . __PACKAGE__ ; $MUSTQUIT ++ ; $NO_SYSLOG_DEBUG = 0 if $SERVICE_DEBUG ; } sub mySigHUP { push @SigDebug, "SIGHUP received in " . __PACKAGE__ ; } sub mySigPIPE { my ($package, $filename, $line) = caller(); push @SigDebug , "SIG$_[0] received at L. $line in $package: $! $?" ; $MUSTQUIT ++ unless ( grep { defined($_) } @PIPE ); } sub myStats { my $self = shift ; return @{$_[0]} ; } sub AskThread { my $self = shift ; my $Asking = $self->{SON} ; if (@_ > 10) { return &Error("[DEV] Too large argument for AskThread: @_"); } my $msg = &GetCom( comREQ , @_ ); { my $len = length($msg); &MAXSTAT('REQUEST-LEN', $len); if ( $len < 50 ) { &UPSTAT('REQUEST-LEN-000-049'); } elsif ( $len < 100 ) { &UPSTAT('REQUEST-LEN-050-099'); } elsif ( $len < 200 ) { &UPSTAT('REQUEST-LEN-100-199'); } elsif ( $len < 500 ) { &UPSTAT('REQUEST-LEN-200-499'); } elsif ( $len < 1000) { &UPSTAT('REQUEST-LEN-500-999'); } else { &UPSTAT('REQUEST-LEN-1000+'); if ( $len > 9999 ) { open LOG, ">" . &GetTmpFile("$Progname-lastbigreq.log") or return &Error("Can't open big req log file in TMP: $!"); print LOG $msg, "\n" ; close(LOG); } } } &UPSTAT("SENDLINES-".$self->{KIND}); return $self->PrintSock( $Asking, $msg ) ; } sub AnswerDone { $_[0]->Answer( &GetCom( comJOB , $_[1] , &GetCom( comDONE ))); } sub AnswerComDone { my $self = shift ; my $socket = $self->{SON} ; $socket = $self->{DADDY} unless ( $$ == $maintid ); $self->{URGENTCOM} = 1 ; if (defined($socket)) { $! = 0 ; $self->ThisError("Can't answer comDONE".(($!)?": $!":"")) unless ($self->PrintSock($socket,&GetCom( comCOM , $$ , comDONE ))); } else { $self->ThisError("Can't answer comDONE: socket not available"); } $self->{URGENTCOM} = 0 ; } sub Return { my $self = shift ; $self->Answer( &GetCom( comREQ , JobManager => &GetCom( comJOB , @_ ))); } sub Answer { my $self = shift ; my $Answers = $self->{DADDY} ; return &UPSTAT('Answer-ERROR') unless ( defined($Answers) and @_ ); my $msg = &GetCom( comCOM, $$ => ( @_ > 1 ? &GetCom( comJOB, @_ ): $_[0] )); { my $len = length($msg); &MAXSTAT('Answer-MAXLEN',$len); if ( $len < 50 ) { &UPSTAT('Answer-LEN-000-049'); } elsif ( $len < 100 ) { &UPSTAT('Answer-LEN-050-099'); } elsif ( $len < 200 ) { &UPSTAT('Answer-LEN-100-199'); } elsif ( $len < 500 ) { &UPSTAT('Answer-LEN-200-499'); } elsif ( $len < 1000) { &UPSTAT('Answer-LEN-500-999'); } else { &UPSTAT('Answer-LEN-1000+'); } if ( $len > $MAX_BUFFER_SIZE - 4096 ) { &UPSTAT("BADCOM-LEN-$len"); open LOG, ">>" . &GetTmpFile("$Progname-$maintid-badcom.log") or return &Error("Can't open badcom log file in TMP: $!"); print LOG $msg, "\n" ; close(LOG); return &Error("Won't send too long ($len) com"); } } unless ($self->PrintSock($Answers, $msg )) { &Debug("Answer not sent, ".__PACKAGE__.", l. ".__LINE__." ($!-$?-$@)"); return 0 ; } } sub Request { my $self = shift; return unless @_ ; $self->Answer( &GetCom( comREQ , @_ ) ); } sub getAnswers { # Should only be used by parent thread my $self = shift; my $max = $COM_BURST ; my @mesg = () ; while ( $max-- and my $msg = $self->ReadSock($self->{SON}) ) { next unless (defined($msg)); chomp $msg ; next unless $msg ; # Skip empty lines { my $len = length($msg); &MAXSTAT('ANSWER-MAXLEN',$len); if ( $len < 50 ) { &UPSTAT('ANSWER-LEN-000-049'); } elsif ( $len < 100 ) { &UPSTAT('ANSWER-LEN-050-099'); } elsif ( $len < 200 ) { &UPSTAT('ANSWER-LEN-100-199'); } elsif ( $len < 500 ) { &UPSTAT('ANSWER-LEN-200-499'); } elsif ( $len < 1000) { &UPSTAT('ANSWER-LEN-500-999'); } else { &UPSTAT('ANSWER-LEN-1000+'); } } push @mesg, $msg ; } # Update read line counter &UPSTAT("READLINES-".$self->{KIND},scalar(@mesg)); return @mesg ; } sub AskToStop { my $self = shift; return 1 if ($self->StoppingSince()); $self->{QUITTING} = [ &gettimeofday() ]; while ( $self->AskThread( QUIT ) < 0 ) { usleep $USLEEP ; unless ($self->StoppingSince() < 0.5) { &Info("Can't ask to quit: $! - $? - $@"); return 0; } } 1 ; } sub AskInit { return $_[0]->ThreadError("1, can't ask to init '$_[1]'") unless ($_[0]->AskThread( _INIT , "$_[1]" )); $_[0]->{_INIT} ++ ; } sub GotInit { $_[0]->{_INIT} -- ; &Debug("Thread " . $_[0]->getName . " ENV updated") unless ( $_[0]->{_INIT} ); } sub StoppingSince { return defined($_[0]->{QUITTING}) ? &tv_interval( $_[0]->{QUITTING} ) : 0 ; } sub DoLog { my $self = shift ; &UPSTAT('DOLOG'); my $good = 0 ; if ( $$ == $maintid ) { if ($self->is('Logger')) { $good = $self->PrintSock($self->{SON}, &GetCom( comREQ, TODO, @_ )); } else { &debugdev("Not expected to handle '@_' here"); &UPSTAT('DOLOG-ERROR'); } } else { $good = $self->PrintSock($self->{DADDY}, &GetCom( comCOM , $$ , @_ )); } unless ($good) { &UPSTAT('BAD-DOLOG'); return 0 ; } } sub END { my $self = $THIS; if (defined($self)) { &Debug("Thread T$$ was " . ($self->{DO_QUIT}?"quitting ":"") . "on job $self->{JOB}") if $self->{JOB} ; # Return any recent Sig message when thread seems aborting unexpectedly if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager') and $self->{JOB} and defined($self->{DADDY}) and (@SigWarn or @SigInfo)) { # Try to advertize JobManager map { $self->Return( $self->{JOB} => &GetCom( comINF, Warning => $_ )) } @SigWarn ; map { $self->Return( $self->{JOB} => &GetCom( comINF, Info => $_ )) } @SigInfo ; map { $self->Return( $self->{JOB} => &GetCom( comINF, Debug => $_ )) } @SigDebug ; $self->Return( $self->{JOB} , 255 ); $self->AnswerDone( $self->{JOB} ); $self->{JOB} = "" ; } # Update loggertid to log directly &LogSigMessage(); } } sub PreDESTROY {} sub DESTROY { my $self = shift; $self->PreDESTROY(); if (!defined($self->{TOCLEAN})) { # Return any recent Sig message when thread seems aborting unexpectedly if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager') and ( $self->{JOB} or !defined($self->{DADDY}) ) and ( @SigWarn or @SigInfo ) ) { &Alert("T$$ seems aborted abnormally".(defined($self->{DADDY})?"": " with socket to parent lost"), @SigWarn, @SigInfo, @SigDebug ); } # Update loggertid to log directly &LogSigMessage(); # Debug age &Debug(sprintf("%s Age = %.2f s", $self->{NAME}, &tv_interval( $self->{FORKED} ))) if ($self->isForked()); # Must kill forked thread if alive if ( $self->isForked() and $self->isAlive() ) { &Debug("Killing thread object $self->{NAME}"); kill 9, $self->{TID} ; } if (defined($self->{ERROR})) { &Error("Thread type object $self->{NAME} destroyed with error #" . $self->{ERROR}); } else { &Debug("Thread object $self->{NAME} destroyed"); } &Debug("Thread object $self->{NAME} status: $! - $? - $@") if ( $! or $? or $@ ); } map { shutdown( $_, 2 ) } grep { defined($_) and (! $self->{TID} or $$ == $maintid) } ( $self->{SON} , $self->{DADDY} ); &Debug(__PACKAGE__." T".($self->{TID}?$self->{TID}:$$)." memory cleaned"); } ############################### Dedicated socket communication members sub ReadSock { &TIMESTAT('ReadSock'); my $self = shift ; my $Socket = shift ; my $fh = $lockfh{$Socket}; @{$ReadBuffer{$Socket}} = () unless (defined($ReadBuffer{$Socket})); my $locked = 0 ; my $size = 0 ; my ( $bigeventcheck , $bigevent ) = ( 1 , 0 ); my ( $sysreadbuffer, $buffer ) = ( "" , "" ); $Socket->blocking(0); $locked = &TakeComLocking($fh) if ($COM_LOCKING and $maintid); $! = 0 ; while (defined($size+=sysread($Socket, $sysreadbuffer, $MAX_BUFFER_SIZE))) { last unless $size ; $buffer .= $sysreadbuffer ; # bigread-event: Handle case we have fullfil our read buffer, we must # complete with next read so loop to next sysread # Test modulo bigeventcheck to handle the case we fullfil more than once last unless ( $size % $MAX_BUFFER_SIZE == $bigeventcheck ); &UPSTAT('BIGREAD-EVENT'.&SelfStatId($self,'full')); $bigeventcheck = ++ $bigevent ; $sysreadbuffer = "" ; } # Don't stats BUSY ressource &UPSTAT('errno-'.int($!)."-$!") if ( $! and $! != 11 ); if ($buffer) { my @lines = split(/\n/,$buffer); my $count = scalar(@lines) ; my $check = comTEST ; # Check buffer could contain a comTEST event as test could involve # performance issue if ( $buffer =~ /$check/ ) { my @test ; # Check really now there's a formated comTEST @lines = grep { @test = &IsCom( comCOM , $_ ); $test[1] !~ /^$check$/ } @lines ; if (@lines != $count ){ &UPSTAT('GET-COMTEST'.&SelfStatId($self)); &Debug("Received comTEST for " . $self->getName) if $ADVANCED_DEBUGGING ; $self->AnswerComDone ; } } # Keep lines in socket dedicated buffer push @{$ReadBuffer{$Socket}} , @lines ; # Keep some monitoring scalar $STATS{'READ-LINES-DUMP'.&SelfStatId($self)} = "@lines" if $bigevent ; &MAXSTAT('MAX-READ-LINES',scalar(@lines)); &MAXSTAT('MAX-READSIZE',$size); &MAXSTAT('MAX-READBUFFER-LINES',scalar(@{$ReadBuffer{$Socket}})); } &UPSTAT('READSOCK-ERR-'.$!.&SelfStatId($self,'full')) unless (defined($size)); &ReleaseComLocking($fh) if ( $locked ); &TIMESTAT('ReadSock'); return shift @{$ReadBuffer{$Socket}} ; } sub PrintSock { my $self = shift ; &UPSTAT('PRINTSOCK-CALL'.&SelfStatId($self)); my $Socket = shift ; my $locked = 0 ; # Return printing immediatly unless we should use com locking unless ($COM_LOCKING and $maintid) { # maintid is always set after perl is loaded my $Ret = &RealPrintSock( $self, $Socket, @_ ); return $Ret ; } # Only open lockfile one time by process, and keep it opened my $fh = $lockfh{$Socket} ; unless (defined($fh)) { $SocketProc{$Socket} = "$maintid-" . $self->getKind() unless (defined($SocketProc{$Socket})); my $lockfile = &GetTmpFile("$SocketProc{$Socket}.comlock"); if (defined($lockfh{$lockfile})) { $lockfh{$Socket} = $lockfh{$lockfile} ; } else { unless ( -e $lockfile ) { open $lockfh{$Socket} , ">" . $lockfile ; close($lockfh{$Socket}); } open $lockfh{$Socket} , "+<" . $lockfile or delete $lockfh{$Socket}, return &debugdev("Can't open '$lockfile' com lock file: $!"); $lockfh{$lockfile} = $lockfh{$Socket} ; } $fh = $lockfh{$Socket} ; } $locked = &TakeComLocking($fh); my $Ret = &RealPrintSock( $self, $Socket, @_ ); &ReleaseComLocking($fh) if ( $locked ); #&TIMESTAT('PrintSock'); return $Ret ; } sub Bufferize { my $self = shift ; my $Socket = shift ; return [] unless (defined($Socket)); my $ref = shift ; my $bufname = 'BUFFER-LINES'.&SelfStatId($self) ; &MAXSTAT($bufname."-LINES",scalar(@{$self->{COMBUFFER}})); # Bufferize if ($self->{URGENTCOM}) { unshift @{$self->{COMBUFFER}} , @{$ref} ; return $self->{COMBUFFER} ; } else { push @{$self->{COMBUFFER}} , @{$ref} ; } # Test if remote buffer may be full to return empty array ref return &RemoteIsFull($self,$Socket) ? [] : $self->{COMBUFFER} ; } sub RemoteIsFull { my $self = shift ; return 0 if ($self->{URGENTCOM}); my $Socket = shift ; return 0 unless (defined($Socket)); my $ref = shift ; local $" = '' ; my $len = defined($ref) ? length("@{$ref}") : 0 ; $len += length(@{$self->{COMBUFFER}}[0]) + 1 if ( defined($self->{COMBUFFER}) and @{$self->{COMBUFFER}} ); $len += $BUFLEN{$Socket} + 4096 ; if ( $len > $MAX_BUFFER_SIZE ) { &UPSTAT('REMOTE-FULL-BUFFER-CHECK'.&SelfStatId($self,'full')); return $len ; } return 0 ; } sub RemoteBufSizeUpdate { my $self = shift ; my $Socket = shift ; $BUFLEN{$Socket} += shift ; $STATS{'CUR-BUFFER-CONTROL'.&SelfStatId($self,'full')} = $BUFLEN{$Socket} ; } my %ComTest = () ; sub ResetRemoteBufSize { my $self = shift ; my $Socket = shift ; $Socket = $self->{SON} unless (defined($Socket)); $BUFLEN{$Socket} = 0 ; delete $ComTest{$Socket} if (defined($ComTest{$Socket})); } sub SelfStatId { my $self = shift ; return "" unless ( defined($self) and $$ == $maintid ); return '-' . ( @_ ? $self->getName : $self->getKind ); } sub RetryLater { my $self = shift ; my $buffer = shift ; my $print = shift ; &UPSTAT('SOCK-RETRYLATER-EVENT'.&SelfStatId($self)); unshift @{$buffer}, @{$print} ; return scalar(@{$print}); } sub RealPrintSock { &TIMESTAT('RealPrintSock'); my $self = shift ; my $Socket = shift ; # Below syntax is right but involve a bug #my $urgent = (defined($self) and exists($self->{URGENTCOM})) ? # $self->{URGENTCOM} : 0 ; # This one is the last from a2p v1.020 without the bug my $urgent = defined($self) and $self->{URGENTCOM} ; my $count ; local $" = "\n" ; my ( $total , $lines , $test ) = ( 0 , 0 , 0 ); my @print = () ; $! = 0 ; my $buffer = &Bufferize( $self, $Socket , \@_ ); while ( @{$buffer} or &RemoteIsFull( $self, $Socket ) ) { if ( @{$buffer} ) { $lines ++; my $line = shift @{$buffer} ; push @print , $line ; } $test = &RemoteIsFull( $self, $Socket, \@print ); #$test = $self->{URGENTCOM} unless ( $test or ! defined($self)); if ( ! @{$buffer} or $test or $urgent ) { my $tries = 0 ; # Add comTEST to COM if ( $test ) { if (! defined($ComTest{$Socket}) or &tv_interval($ComTest{$Socket}) >= 1) { # Send again comTEST after 1 second local $" = "' '" ; my $comtest = &GetCom( comCOM, $$ , comTEST ); $ComTest{$Socket} = [ &gettimeofday() ] ; &UPSTAT('ADD-COMTEST-EVENT'.&SelfStatId($self)); if ( $test >= $MAX_BUFFER_SIZE ) { &UPSTAT('KEEP-COMTEST-EVENT'.&SelfStatId($self)); if (@print and ! $urgent) { my $print = pop @print ; $lines -- ; unshift @{$buffer}, $print ; &debugdev("Retry later '$print'" . ( @print ? " keeping '@print'":"") . " to be sure to not fullfill remote buffer" . " with '$comtest'") if ( $ADVANCED_DEBUGGING ); } } elsif ( $ADVANCED_DEBUGGING ) { &debugdev("Sending '$comtest'" . (@print?" added to '@print'":"")); } # Add comtest to print list push @print, $comtest ; } elsif (@print) { &UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self)); # Last comTEST was sent recently, we should better directly # keep the com and retry it later $lines -= &RetryLater($self,$buffer,\@print); $total = -1 unless ($total); last ; } else { &UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self)); # Remote buffer can be full and we have recently sent a # comTEST and there's nothing to send, so just leave $total = -1 ; last ; } } # Wait when socket is busy my $print = "@print" . $" ; while (!defined($count = syswrite($Socket, $print)) and $! == EAGAIN ) { &UPSTAT('BUSY-SOCKET-EVENT'.&SelfStatId($self)); last if ( $tries ++ > 1000 ); usleep $USLEEP ; last if $MUSTQUIT ; &Debug("$Socket socket seems busy with errno: $!") unless ( $tries % 20 ); } if ( $tries ) { local $" = ' ' ; &MAXSTAT('CHARSOCK-RETRY',$tries); $STATS{'CHARSOCK-RETRY'.&SelfStatId($self)} = "@print" ; } if (defined($count)) { if ($count) { &RemoteBufSizeUpdate( $self, $Socket, $count ); $total += $count ; &MAXSTAT('CHARSOCK-COUNT',$count); &UPSTAT('LINES-SOCK'.&SelfStatId($self,'full'),$lines); } elsif ($lines) { &UPSTAT('LINES-SOCK-NOTSENT'.&SelfStatId($self,'full'), $lines); $lines -= &RetryLater($self,$buffer,\@print); $total = -1 unless ($total); last ; } last if $test ; } elsif ($! == EPIPE) { close($Socket); if ($self->{TID}) { $loggertid = 0 if ($self->getKind eq 'Logger'); &Warn("Communication has been reset with ".$self->getName); $do_check ++ ; } elsif ( $Socket == $self->{DADDY} ) { $loggertid = 0 ; $MUSTQUIT ++ ; &Warn("Must quit as my socket to parent has been reset"); } } else { local $" = "' '" ; &Debug("$Socket socket is busy, will retrying '@print' later"); $STATS{'SOCKET-ERROR-'.int($!).&SelfStatId($self)} = "Can't send '@print'"; $lines -= &RetryLater($self,$buffer,\@print); $total = -1 unless ($total); last ; } @print = () ; } last if $MUSTQUIT ; } &TIMESTAT('RealPrintSock'); if ($total) { &MAXSTAT('MAX-TOTALSOCK'.&SelfStatId($self),$total); &MAXSTAT('MAX-LINESOCK'.&SelfStatId($self),$lines); return @{$buffer} ? - @{$buffer} : ( $lines ? $lines : $total ); } return 0 ; } END { if ( $COM_LOCKING ) { # Close all locking file handles and clean any locking file map { close($lockfh{$_}) ; /^$SERVICE_TMP/ and -e $_ and unlink $_ } keys(%lockfh); } } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;