#
# Copyright (c) 2004-2007 - Consultas, PKG.fr
#
# This file is part of A2P.
#
# A2P is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# A2P is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with A2P; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# $Id: Thread.pm 3 2007-10-18 16:20:19Z guillaume $
#
package A2P::Thread;
use strict ;
use Socket ;
use Errno qw(:POSIX);
use Fcntl ':flock' ;
use IO::Socket ;
use Time::HiRes qw( usleep gettimeofday tv_interval ) ;
use POSIX qw(:signal_h :sys_wait_h setsid) ;
use A2P::Globals ;
use A2P::Syslog ;
use A2P::threads ;
use A2P::threads qw( threadSleep reduceSleep ) ;
use A2P::Signal ;
use A2P::Signal qw( LogSigMessage ) ;
use A2P::Globals qw( UpdateSharedEnv ) ;
use A2P::Com qw( comREQ comCOM comJOB comUPD comDONE comINF comXML comTEST
IsCom GetCom TakeComLocking ReleaseComLocking GetTmpFile );
use A2P::JobStatus 'a2pjobstate' ;
BEGIN {
our $VERSION = sprintf "%s", q$Rev: 1399 $ =~ /(\d[0-9.]+)\s+/ ;
}
our $VERSION ;
# Hash used for communications
my %ReadBuffer = () ;
my %lockfh = () ;
my %SocketProc = () ;
my %BUFLEN = () ;
my $THIS ;
sub new {
my $class = shift ;
&Debug("new $class v$VERSION");
$class =~ /^A2P::(.*)$/ ;
my $self = {
KIND => $1,
NAME => $Progname . '-' . $1 ,
SIGTERM => 0,
SIGKILL => 0,
TOPING => 0,
LOGGER => 0,
BUSY => [ 0, "" ],
DO_QUIT => 0,
DO_STAT => [ &gettimeofday() ],
CanPING => [ &gettimeofday() ],
HAVE_STATS => 0,
STATS => [],
DONTSLEEP => 0,
URGENTCOM => 0,
COMBUFFER => [],
JOB => ""
};
return bless $self , $class ;
}
sub init {
my $self = shift ;
&Debug("new A2P::Thread is " . $self->getName);
return $self->ThreadError("1, '$self->{KIND}' is not a valid thread kind")
unless defined($SonKind{$self->{KIND}}) ;
socketpair( $self->{SON}, $self->{DADDY}, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
or return $self->ThreadError("2, Can't open socket pair for " .
$self->{KIND} . " thread: $!");
bless $self->{SON} , "IO::Socket" ;
bless $self->{DADDY} , "IO::Socket" ;
&Debug($self->getName . " object is ready");
1 ;
}
our $AbortMessages ;
sub defautABTERMError {
return () ;
}
sub ABTERM {
my $self = shift ;
my ( $package, $filename, $line ) = caller ;
my $Err = $_[0] =~ /^\d+$/ ? shift : $line ;
@AbortTime = &gettimeofday();
my @ERR = $self->defautABTERMError() ;
push @ERR, (defined($AbortMessages) and defined($AbortMessages->{$Err}))?
( $Err < 100 ?
$AbortMessages->{$Err}
:
sprintf( $AbortMessages->{$Err} , @_ )
)
: @_ ;
&Error( @ERR );
# Return information to JobManager is JOB defined and not JobManager
map { $self->Return( $self->{JOB} => &GetCom( comINF, 'ABTERM', $_ )) } @ERR
if (defined($self->{JOB}) and ! $self->is('JobManager'));
map { &a2pjobstate( $self->{JOB}, $self->{STEP}, 'A',
{ ABTERM => $_ } ) } @ERR
if (defined($self->{JOB}) and defined($self->{STEP}));
&UPSTAT('JOB-ABTERM');
0 ;
}
sub getBusyReq {
# Cosmetic function to return debugging info for later debug
my $self = shift ;
my @business = @{$self->{BUSY}};
shift @business ;
return "@business" ;
}
sub setBusy {
my $self = shift ;
@{$self->{BUSY}} = @_ ;
&UPSTAT('SET-BUSY-SON') if ( $_[0] );
}
sub isBusyBy {
my $self = shift ;
# {BUSY} is initialized to [ 0 , "" ] at object creation
if ( @_ ) {
return $self->{BUSY}->[0] == $_[0] ;
} else {
return $self->{BUSY}->[0];
}
}
sub setReady {
&UPSTAT('SET-READY-SON');
$_[0]->setBusy( 0, "" );
}
sub self_TEST {
my $self = shift ;
$do_test = 0 ;
our @TT ;
$STATS{'RUNNING'} = sprintf("%.01fs", &tv_interval(\@TT) - $^T);
# Output current memorized dynamic statistics
my @stats = grep { $STATS{$_} } sort(keys(%STATS)) ;
my $tag = $maintid ? "-$maintid" : "" ;
if (@stats) {
open LOG, ">>", "/tmp/$Progname$tag-selftest.log"
or return &Error("Can't open log file in /tmp for self-test: $!");
# Don't check anything as this is not critical
flock(LOG, LOCK_EX);
foreach my $stat
( map { sprintf("%50s %s", $_, $STATS{$_}) } @stats ) {
print LOG $stat, "\n" ;
}
flock(LOG, LOCK_UN);
close(LOG);
}
# Output current memorized log lines
my @logs = &getBackLogs();
if ( @logs ) {
open LOG, ">>", "/tmp/$Progname$tag-selfdebug.log"
or return &Error("Can't open log file in /tmp for self-debug: $!");
flock(LOG, LOCK_EX);
foreach my $debug ( @logs ) {
print LOG $debug, "\n" ;
}
flock(LOG, LOCK_UN);
close(LOG);
}
# Output other things
foreach my $Array qw( REQUESTS_ARRAY ) {
if (defined($self->{$Array})) {
my @Array = @{$self->{$Array}} ;
if (@Array) {
my $ii = 0 ;
my $li = "[%0" . scalar(@{$self->{$Array}}) . "d]: " ;
open LOG, ">>", "/tmp/$Progname$tag-self$Array.log"
or return &Error("Can't open log file in /tmp for " . $Array
. " array: $!");
flock(LOG, LOCK_EX);
foreach my $val ( @Array ) {
$val = ref($val) =~ /^ARRAY/i ? join('',@{$val}) : $val ;
print LOG $Array, sprintf($li,$ii++), $val, "\n" ;
}
flock(LOG, LOCK_UN);
close(LOG);
}
}
}
}
sub Delete {
my $self = shift ;
&Debug("Deleting T$self->{TID} thread from sons list")
unless (defined($self->{TOCLEAN}));
$sons{$self->{TID}} = 0 ;
delete $sons{$self->{TID}} ;
undef $self
}
sub ThisError {
my ( $package, $filename, $line ) = caller ;
$filename =~ m|([^/]+)$| ;
&Error($_[0]->getName . ": from $1, L.$line: " . $_[1]);
0 ;
}
sub commonloopdebug {
my $self = shift ;
&Debug("Current BUFLEN:", map { "$_ -> $BUFLEN{$_}" } keys(%BUFLEN));
}
sub loopdebug ;
*loopdebug = \&commonloopdebug ;
sub ThreadError {
$_[0]->{ERROR} = $_[1] ;
0 ;
}
sub setToClean {
$_[0]->{TOCLEAN} = 1 ;
}
sub SwitchLogger {
# Only called from parent
return 0 if ( $_[0]->{LOGGER} == $_[1] or ! $_[0]->{TID} );
&Debug("Switching logger to T$_[1] for " . $_[0]->getName);
$_[0]->{LOGGER} = $_[1] ;
$_[0]->AskThread( _INIT , "$_[1]" );
}
sub getName {
return $_[0]->{NAME} ;
}
sub getKind {
return $_[0]->{KIND} ;
}
sub is {
return grep { $_[0]->getKind() eq $_ } @_ ;
}
sub isForked {
return defined($_[0]->{FORKED}) ? 1 : 0 ;
}
sub isAlive {
return 0 unless ( $_[0]->{TID} );
my $test = 0 ;
$test ++ if (kill 0 , $_[0]->{TID});
# TODO ContrĂ´ler errno p 371 linux PT
&Debug( $_[0]->getName . ( $test ? " seems alive" : " seems dead" ) );
return $test ;
}
sub isToCheck {
return 0 if (defined($_[0]->{QUITTING}));
# A thread is to check if it has been check more than 15 seconds ago
return $_[0]->LastCheck() > 15 ? $_[0]->CanPing() : 0 ;
}
sub KeepStats {
my $self = shift ;
return unless ( $self->{HAVE_STATS} and $KEEP_STATS and -d $STATS_FOLDER );
if (@_) {
my @time = localtime ;
push @{$self->{STATS}}, [
sprintf('%4d%02d%02d', 1900+$time[5], $time[4]+1, $time[3]),
sprintf('%02d%02d%02d', $time[2], $time[1], $time[0])
, $self->myStats(\@_) ];
}
# Only keep stat after at least 1 second has passed, and there's some
# But keep output stats if we are quitting
return unless ( @{$self->{STATS}}
and ($self->{DO_QUIT} or &tv_interval($self->{DO_STAT}) > 1));
my %Locked = () ;
my $File ;
foreach my $Stat ( @{$self->{STATS}} ) {
# Compute filename
my $statfile = $self->getName() . '-' . $Stat->[0] ;
unless ( $Locked{$statfile} ) {
open $File, ">>$STATS_FOLDER/$statfile"
or return $self->ThreadError("Can't open stats file '" .
$statfile . "': $!");
# Try exclusive lock
my $locked = flock($File, LOCK_EX);
unless ($locked) {
close($File);
return $self->ThreadError("Can't lock stats file '" .
$statfile . "': $!");
}
if ( defined($self->{STATHEADERS}) and @{$self->{STATHEADERS}} ) {
# Force again headers output after a restart, in case they
# have changed
print $File join(';', @{$self->{STATHEADERS}}), "\n"
or return $self->ThreadError("Can't output in file '" .
$statfile . "': $!");
@{$self->{STATHEADERS}} = () ;
}
$Locked{$statfile} = $File ;
}
$File = $Locked{$statfile} ;
print $File join(';', @{$Stat}), "\n"
or return $self->ThreadError("Can't output in statfile '" .
$statfile . "': $!");
}
$self->{STATS} = [ ] ;
# Unlock and close stat files
map { flock($Locked{$_}, LOCK_UN) and close($Locked{$_}) } keys(%Locked);
# Reset stats timer
$self->{DO_STAT} = [ &gettimeofday() ];
}
sub LastCheck {
return defined($_[0]->{CHECKED}) ? &tv_interval($_[0]->{CHECKED}) : 100 ;
}
sub Check {
$_[0]->{TOPING} ++ ;
# A thread is really busy if it has not say any thing since 60 seconds
# (see isToCheck())
&Info($_[0]->getName . " thread seems busy since " .
sprintf("%.2f s",$_[0]->{TOPING}))
unless ( $_[0]->{TOPING} and $_[0]->{TOPING} % 4 );
$_[0]->{CHECKED} = [ &gettimeofday() ];
$_[0]->{CanPING} = [ &gettimeofday() ];
}
sub Ping {
$_[0]->AskThread( PING ) if ( $_[0]->{TOPING} and $_[0]->CanPing() );
}
sub GotPing {
$_[0]->{CHECKED} = [ &gettimeofday() ];
$_[0]->{TOPING} -- ;
$_[0]->{CanPING} = [ &gettimeofday() ];
}
sub CanPing {
# We can repeat a ping only if last ping was done more than 5 seconds ago
return 0 unless (defined($_[0]->{CanPING}));
if (&tv_interval($_[0]->{CanPING}) > 5) {
$_[0]->{CanPING} = [ &gettimeofday() ] ;
return 1 ;
}
0 ;
}
sub SendSigTerm {
# Send SIGTERM one time after at least 5 seconds
my $self = shift;
my $since = $self->StoppingSince();
return 0 unless ( $since > 5 and $since < 30 and ! $self->{SIGTERM} );
&Debug("Killing " . $self->getName . " with SIGTERM");
kill 15, $self->{TID} ;
return ++ $self->{SIGTERM} ;
}
sub SendSigKill {
# Send SIGKILL one time after at least 30 seconds
my $self = shift;
return 0 if ( $self->{SIGKILL} or $self->StoppingSince() < 30 );
&Debug("Killing " . $self->getName . " with SIGKILL");
kill 9, $self->{TID} ;
return ++ $self->{SIGKILL} ;
}
################################################################################
## Fork code ##
################################################################################
sub Fork {
my $self = shift;
# Security: Only one fork call can be done by created Thread object
return undef if ($self->isForked());
# Classical way to fork
$self->{TID} = fork ;
# Return to parent early if not forked
return undef unless (defined($self->{TID}));
$self->{FORKED} = [ &gettimeofday() ];
# Clean SYSLOG socket
if (defined($SYSLOG)) {
shutdown($SYSLOG,2);
$SYSLOG = undef ;
&Debug("SYSLOG socket is reset");
}
# Return to parent
if ( $self->{TID} ) {
#============ In parent =============
&Debug("Parent-to-child link is " . $self->{SON} );
push @PIPE , $self->{SON} ;
# Set the filename for COM locking
$SocketProc{$self->{SON}} = "$maintid-" . $self->getKind()
if $COM_LOCKING ;
# Can define logger socket for parent
&SetLogger( $self ) if ($self->is('Logger'));
# Update object name as it is used in logging messages
$self->{NAME} .= '[' . $self->{TID} . ']' ;
# Return to caller in parent
return $self->{TID} ;
}
#=== In child, we won't leave this object member ===
# Clean error status
$? = 0 ;
$! = 0 ;
# Clean open COM locking sockets and communications hashes
if ( $COM_LOCKING ) {
map { close($lockfh{$_}) } keys(%lockfh);
%SocketProc = () ;
%lockfh = () ;
}
%ReadBuffer = () ;
%BUFLEN = () ;
# Clear stats
map { $STATS{$_} = 0 } keys(%STATS) ;
# Update immedialty signal handling,
# Keep in mind these signal handlers won't be called as object member
$SIG{'USR1'} = \&mySigUSR1 ;
$SIG{'ALRM'} = \&mySigALRM ;
$SIG{'TERM'} = \&mySigTERM ;
$SIG{'QUIT'} = \&mySigTERM ;
$SIG{'HUP'} = \&mySigHUP ;
$SIG{'ABRT'} = \&mySigTERM ;
# Update child process name
$0 = $self->getName ; # Update process name with 'ps' command
$Progname = $self->getName ; # For logging
# Reduce priority as parent must be the one with the greater priority
my $priority = getpriority( 0, $$ );
if ( $priority < 15 ) {
# Increase our priority
setpriority 0, 0, $priority + 5
or &Warn("Can't update priority: $!");
}
$self->ThreadInitEarly();
&Debug("Child " . $self->getName . " is starting...");
&Debug("Child-to-parent link is " . $self->{DADDY} );
push @PIPE , $self->{DADDY} ;
# Cleaning this process from previously created objects
map {
if ( $_ != $$ ) {
&Debug("Cleaning unused T$_ cloned object during fork");
$sons{$_}->setToClean() ; # Just to avoid checking from DESTROY sub
$sons{$_}->Delete();
}
} keys(%sons);
&Debug("Memory cleaned");
&Debug("Starting " . $self->getName . " T$$");
$self->ThreadInit();
my $Ret = $self->ForkedLoop ;
$loggertid = 0 ; # Force to log directly
&LogSigMessage();
&Debug($self->getName . " ended ($MUSTQUIT;$!;$?;$@)");
&Debug("Forced to quit ($MUSTQUIT)") if $MUSTQUIT ;
$self->Answer( QUIT );
$self->KeepStats();
$self->DoBeforeQuit();
exit $Ret ;
}
my @bad_answer = () ;
my @KeepRequest = () ;
sub ForkedLoop {
my $self = $THIS = shift ;
my $ErrCount = 0 ;
my $request ;
my $LoopTiming = [ &gettimeofday() ];
my $PendingStatus = [ &gettimeofday() ];
my $LoopCount = 0 ;
my $SleepFactor = 1 ;
$! = 0 ;
while ( ! ( $MUSTQUIT or $self->{DO_QUIT} ) ) {
&TIMESTAT('ForkedLoop');
&LogSigMessage();
# Check to commit any pending job status at last each minute
if (&tv_interval($PendingStatus) > 60) {
&a2pjobstate ;
$PendingStatus = [ &gettimeofday() ];
}
&UPSTAT('LOOPS');
$self->{DONTSLEEP} ? &reduceSleep('no stats') : &threadSleep() ;
# Just to flush buffer
$self->PrintSock($self->{DADDY});
$self->KeepStats();
# Test thread sub, can be useful while debugging
$self->self_TEST() if $do_test ;
my $max = $COM_BURST ;
while ( $max-- and
(defined( $request = $self->GetRequest($self->{DADDY})))) {
last if ( $MUSTQUIT or $self->{DO_QUIT} );
chomp $request ;
my @Req = &IsCom( comREQ , $request );
if ( @Req == 2 ) {
if ( $Req[0] == QUIT ) { # Do I have to quit
&Debug("Notified to QUIT by parent");
$self->{DO_QUIT} = 1 ;
} elsif ( $Req[0] == PING ) { # Do I have to ping
$self->Answer( PING );
&Debug("PING");
} elsif ( $Req[0] == _INIT ) { # ENV update
$self->Answer( _INIT );
if ( $Req[1] =~ m|^\s*(\$ENV{\w+}\s*=.*)$|i ) {
if ( $self->{clientrequest} ) {
# SECURITY issue:
# Only Listener is concerned: no external init
# accepted for now
$self->CloseClient( $self->{clientrequest}, 1,
"INIT is not supported from other application");
} else {
&Debug("Do INIT: $1");
eval $1 ;
}
} elsif ( $Req[1] =~ m|^CAN_UPDATE_NOW$|i ) {
# Update shared vars with ENV definition
&UpdateSharedEnv ;
# Reset Debug comportement
&ResetDebug() ;
$self->InitUpdated();
} elsif ( $Req[1] =~ m|^(\d+)$|i ) {
if ( $1 == $$ ) {
&Debug("Approuved as logger");
} elsif ( $self->is('Logger') ) {
&Debug("Revoked as logger thread");
# We should quit as we are no more used for service
$self->{DO_QUIT} = 1 ;
} else {
&Debug("My logger is T$1") if $1 ;
$loggertid = $1 ;
}
} else {
$self->ThisError("Not able to evaluate '" . $Req[1] .
"' as ENV update");
$ErrCount ++ ;
}
} elsif ( $Req[0] == TODO ) {
&Debug("Got 'TODO' COM: '$Req[1]'") if $ADVANCED_DEBUGGING ;
$ErrCount ++ unless $self->Do( \$Req[1] );
# Called to do our job to we should sleep less next time
&reduceSleep();
} else {
$self->ThisError("[DEV] Can't decide request " .
(defined($means{$request})?$means{$request}:$request));
$ErrCount ++ ;
}
# TODO Test me with also $Req[0] == _UPDATE
} elsif ( @Req = &IsCom( comUPD , $request ) ) {
if ( $Req[1] =~ m|^(\w+)=(["'])?(.*)\2$|i ) {
my ( $var , $value ) = ( $1 , $3 );
if (defined($1) and defined($3)) {
if ( grep { /$var/ } @SHARED ) {
my $apos = $value =~ /^\d+$/ ? '' : '"' ;
eval '$' . $var . '=' . $apos . $value . $apos ;
# For important thread initialization
$self->InitUpdated();
&UPSTAT($var.'_DIRECT_UPDATE');
} else {
$self->ThisError("Not authorized to update '" . $var
. "' to '$value'");
&UPSTAT('UNAUTH_DIRECT_UPDATE');
$ErrCount ++ ;
}
} else {
$self->ThisError("Bad direct update request '$Req[1]'");
&UPSTAT('BAD_DIRECT_UPDATE');
$ErrCount ++ ;
}
} else {
$self->ThisError("Bad formated update request '$Req[1]'");
&UPSTAT('BADFORMAT_UPDATE');
$ErrCount ++ ;
}
} elsif ( @Req = &IsCom( comCOM , $request ) ) {
if ( $Req[1] == comDONE ) {
&UPSTAT('GETCOMDONE');
&Debug("Received comDONE from parent")
if $ADVANCED_DEBUGGING ;
$self->ResetRemoteBufSize($self->{DADDY});
} else {
$self->ThisError("[DEV] Can't handle request '$request'");
$ErrCount ++ ;
}
# Handle XML request
} elsif ( @Req = &IsCom( comXML , $request ) ) {
&Debug("Got an XML request");
# Check we have XML API and call it
if (UNIVERSAL::can( $self , 'XML' )) {
$self->XML($request);
} else {
$self->ThisError("[DEV] Not expected to handle '" . $request
. "' XML request");
$ErrCount ++ ;
}
} else {
if ( $request =~ /^[<]/ ) {
my $ref ;
&UPSTAT('COM-CHECK-FRAGMENT');
# Control this com is still not handled
if (($ref) = grep { $_->[0] eq $request } @bad_answer ) {
# Don't keep too much time
next if (++ $ref->[1] < 100);
} else {
# Will try to reassemble a fragmented com
push @bad_answer, [ $request, 0 ] ;
$max ++ ;
next ;
}
} elsif (@bad_answer) {
# Try to assemble and reinject immediatly
my $try = shift @bad_answer ;
my $com = $try->[0] . $request ;
&Info("Trying reassembled com: $com");
push @KeepRequest, $com ;
&UPSTAT('COM-REASSEMBLED');
$max ++ ;
next ;
}
# This only critical if not in Logger
if ($self->getKind() eq 'Logger') {
&Warn("Critical error on " . $self->getName);
} else {
&Alert("Critical error on " . $self->getName);
}
$self->ThisError("[DEV] Can't interpret request '$request'");
$ErrCount ++ ;
}
}# Loop COM_BURST
# Check parent is alive
unless ( kill 0, $maintid ) {
# TODO ContrĂ´ler errno p 371 de linux pt
$loggertid = 0 ;
&Warn("Quitting as parent seems dead");
$MUSTQUIT = 1 ;
}
{ # Rating the do loop
no integer ;
$LoopCount += 100 ;
my $int = &tv_interval($LoopTiming);
if ( $int >= 1 ) {
$LoopCount = 0 ;
$LoopTiming = [ &gettimeofday() ];
&MAXSTAT('DOLOOP-RATE',int($LoopCount/$int)/100);
}
}
if ( $ADVANCED_DEBUGGING ) {
&Debug("Loop rate is " . $STATS{'DOLOOP-RATE'} );
$self->loopdebug();
}
&TIMESTAT('ForkedLoop');
}# Loop to read queue
undef $THIS ;
&Debug("$ErrCount counted errors") if $ErrCount ;
return $ErrCount ? 1 : 0 ;
}
################################################################################
## End Fork code ##
################################################################################
sub ThreadInit {
1 ; # Nothing to do by default
}
sub ThreadInitEarly {
# Directly link Log answers to parent (defined in Syslog.pm)
&SetLogger( $_[0] );
# This member must be overrided in Logger Object
}
sub InitUpdated {
1 ; # Nothing to do by default
}
sub GetRequest {
my $self = shift ;
my $Requests = shift ;
# Check if we got a fragmented not safe request
return shift @KeepRequest if @KeepRequest ;
return ( $MUSTQUIT or $self->{DO_QUIT} )? 1 : $self->ReadSock($Requests) ;
}
sub Do {
my $self = shift ;
my $ref = shift ;
&Debug("Got to do '$$ref'");
}
sub DoBeforeQuit {
1 ; # Nothing to do by default
}
sub mySigUSR1 {
push @SigDebug, "SIGUSR1 received in " . __PACKAGE__ ;
$do_test ++ ;
}
sub mySigALRM {
return unless ($SCAN_SPOOL);
$do_file ++ if ( $do_file < $MAXTASK and $SCAN_SPOOL );
push @SigDebug, "SIGALRM received (do_file=$do_file) in " . __PACKAGE__ ;
}
sub mySigTERM {
my $ref = $_[0] eq 'ABRT' ? \@SigWarn : \@SigDebug ;
push @{$ref}, "SIG$_[0] received in " . __PACKAGE__ ;
$MUSTQUIT ++ ;
$NO_SYSLOG_DEBUG = 0 if $SERVICE_DEBUG ;
}
sub mySigHUP {
push @SigDebug, "SIGHUP received in " . __PACKAGE__ ;
}
sub mySigPIPE {
my ($package, $filename, $line) = caller();
push @SigDebug , "SIG$_[0] received at L. $line in $package: $! $?" ;
$MUSTQUIT ++ unless ( grep { defined($_) } @PIPE );
}
sub myStats {
my $self = shift ;
return @{$_[0]} ;
}
sub AskThread {
my $self = shift ;
my $Asking = $self->{SON} ;
if (@_ > 10) {
return &Error("[DEV] Too large argument for AskThread: @_");
}
my $msg = &GetCom( comREQ , @_ );
{
my $len = length($msg);
&MAXSTAT('REQUEST-LEN', $len);
if ( $len < 50 ) { &UPSTAT('REQUEST-LEN-000-049');
} elsif ( $len < 100 ) { &UPSTAT('REQUEST-LEN-050-099');
} elsif ( $len < 200 ) { &UPSTAT('REQUEST-LEN-100-199');
} elsif ( $len < 500 ) { &UPSTAT('REQUEST-LEN-200-499');
} elsif ( $len < 1000) { &UPSTAT('REQUEST-LEN-500-999');
} else {
&UPSTAT('REQUEST-LEN-1000+');
if ( $len > 9999 ) {
open LOG, ">" . &GetTmpFile("$Progname-lastbigreq.log")
or return &Error("Can't open big req log file in TMP: $!");
print LOG $msg, "\n" ;
close(LOG);
}
}
}
&UPSTAT("SENDLINES-".$self->{KIND});
return $self->PrintSock( $Asking, $msg ) ;
}
sub AnswerDone {
$_[0]->Answer( &GetCom( comJOB , $_[1] , &GetCom( comDONE )));
}
sub AnswerComDone {
my $self = shift ;
my $socket = $self->{SON} ;
$socket = $self->{DADDY} unless ( $$ == $maintid );
$self->{URGENTCOM} = 1 ;
if (defined($socket)) {
$! = 0 ;
$self->ThisError("Can't answer comDONE".(($!)?": $!":""))
unless ($self->PrintSock($socket,&GetCom( comCOM , $$ , comDONE )));
} else {
$self->ThisError("Can't answer comDONE: socket not available");
}
$self->{URGENTCOM} = 0 ;
}
sub Return {
my $self = shift ;
$self->Answer( &GetCom( comREQ , JobManager => &GetCom( comJOB , @_ )));
}
sub Answer {
my $self = shift ;
my $Answers = $self->{DADDY} ;
return &UPSTAT('Answer-ERROR') unless ( defined($Answers) and @_ );
my $msg = &GetCom( comCOM, $$ => ( @_ > 1 ? &GetCom( comJOB, @_ ): $_[0] ));
{
my $len = length($msg);
&MAXSTAT('Answer-MAXLEN',$len);
if ( $len < 50 ) { &UPSTAT('Answer-LEN-000-049');
} elsif ( $len < 100 ) { &UPSTAT('Answer-LEN-050-099');
} elsif ( $len < 200 ) { &UPSTAT('Answer-LEN-100-199');
} elsif ( $len < 500 ) { &UPSTAT('Answer-LEN-200-499');
} elsif ( $len < 1000) { &UPSTAT('Answer-LEN-500-999');
} else { &UPSTAT('Answer-LEN-1000+'); }
if ( $len > $MAX_BUFFER_SIZE - 4096 ) {
&UPSTAT("BADCOM-LEN-$len");
open LOG, ">>" . &GetTmpFile("$Progname-$maintid-badcom.log")
or return &Error("Can't open badcom log file in TMP: $!");
print LOG $msg, "\n" ;
close(LOG);
return &Error("Won't send too long ($len) com");
}
}
unless ($self->PrintSock($Answers, $msg )) {
&Debug("Answer not sent, ".__PACKAGE__.", l. ".__LINE__." ($!-$?-$@)");
return 0 ;
}
}
sub Request {
my $self = shift;
return unless @_ ;
$self->Answer( &GetCom( comREQ , @_ ) );
}
sub getAnswers { # Should only be used by parent thread
my $self = shift;
my $max = $COM_BURST ;
my @mesg = () ;
while ( $max-- and my $msg = $self->ReadSock($self->{SON}) ) {
next unless (defined($msg));
chomp $msg ;
next unless $msg ; # Skip empty lines
{
my $len = length($msg);
&MAXSTAT('ANSWER-MAXLEN',$len);
if ( $len < 50 ) { &UPSTAT('ANSWER-LEN-000-049');
} elsif ( $len < 100 ) { &UPSTAT('ANSWER-LEN-050-099');
} elsif ( $len < 200 ) { &UPSTAT('ANSWER-LEN-100-199');
} elsif ( $len < 500 ) { &UPSTAT('ANSWER-LEN-200-499');
} elsif ( $len < 1000) { &UPSTAT('ANSWER-LEN-500-999');
} else { &UPSTAT('ANSWER-LEN-1000+'); }
}
push @mesg, $msg ;
}
# Update read line counter
&UPSTAT("READLINES-".$self->{KIND},scalar(@mesg));
return @mesg ;
}
sub AskToStop {
my $self = shift;
return 1 if ($self->StoppingSince());
$self->{QUITTING} = [ &gettimeofday() ];
while ( $self->AskThread( QUIT ) < 0 ) {
usleep $USLEEP ;
unless ($self->StoppingSince() < 0.5) {
&Info("Can't ask to quit: $! - $? - $@");
return 0;
}
}
1 ;
}
sub AskInit {
return $_[0]->ThreadError("1, can't ask to init '$_[1]'")
unless ($_[0]->AskThread( _INIT , "$_[1]" ));
$_[0]->{_INIT} ++ ;
}
sub GotInit {
$_[0]->{_INIT} -- ;
&Debug("Thread " . $_[0]->getName . " ENV updated")
unless ( $_[0]->{_INIT} );
}
sub StoppingSince {
return defined($_[0]->{QUITTING}) ? &tv_interval( $_[0]->{QUITTING} ) : 0 ;
}
sub DoLog {
my $self = shift ;
&UPSTAT('DOLOG');
my $good = 0 ;
if ( $$ == $maintid ) {
if ($self->is('Logger')) {
$good = $self->PrintSock($self->{SON}, &GetCom( comREQ, TODO, @_ ));
} else {
&debugdev("Not expected to handle '@_' here");
&UPSTAT('DOLOG-ERROR');
}
} else {
$good = $self->PrintSock($self->{DADDY}, &GetCom( comCOM , $$ , @_ ));
}
unless ($good) {
&UPSTAT('BAD-DOLOG');
return 0 ;
}
}
sub END {
my $self = $THIS;
if (defined($self)) {
&Debug("Thread T$$ was " . ($self->{DO_QUIT}?"quitting ":"") .
"on job $self->{JOB}")
if $self->{JOB} ;
# Return any recent Sig message when thread seems aborting unexpectedly
if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager')
and $self->{JOB} and defined($self->{DADDY})
and (@SigWarn or @SigInfo))
{
# Try to advertize JobManager
map { $self->Return(
$self->{JOB} => &GetCom( comINF, Warning => $_ )) } @SigWarn ;
map { $self->Return(
$self->{JOB} => &GetCom( comINF, Info => $_ )) } @SigInfo ;
map { $self->Return(
$self->{JOB} => &GetCom( comINF, Debug => $_ )) } @SigDebug ;
$self->Return( $self->{JOB} , 255 );
$self->AnswerDone( $self->{JOB} );
$self->{JOB} = "" ;
}
# Update loggertid to log directly
&LogSigMessage();
}
}
sub PreDESTROY {}
sub DESTROY {
my $self = shift;
$self->PreDESTROY();
if (!defined($self->{TOCLEAN})) {
# Return any recent Sig message when thread seems aborting unexpectedly
if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager')
and ( $self->{JOB} or !defined($self->{DADDY}) )
and ( @SigWarn or @SigInfo ) )
{
&Alert("T$$ seems aborted abnormally".(defined($self->{DADDY})?"":
" with socket to parent lost"), @SigWarn, @SigInfo, @SigDebug );
}
# Update loggertid to log directly
&LogSigMessage();
# Debug age
&Debug(sprintf("%s Age = %.2f s", $self->{NAME},
&tv_interval( $self->{FORKED} )))
if ($self->isForked());
# Must kill forked thread if alive
if ( $self->isForked() and $self->isAlive() ) {
&Debug("Killing thread object $self->{NAME}");
kill 9, $self->{TID} ;
}
if (defined($self->{ERROR})) {
&Error("Thread type object $self->{NAME} destroyed with error #" .
$self->{ERROR});
} else {
&Debug("Thread object $self->{NAME} destroyed");
}
&Debug("Thread object $self->{NAME} status: $! - $? - $@")
if ( $! or $? or $@ );
}
map {
shutdown( $_, 2 )
} grep {
defined($_) and (! $self->{TID} or $$ == $maintid)
}
( $self->{SON} , $self->{DADDY} );
&Debug(__PACKAGE__." T".($self->{TID}?$self->{TID}:$$)." memory cleaned");
}
############################### Dedicated socket communication members
sub ReadSock {
&TIMESTAT('ReadSock');
my $self = shift ;
my $Socket = shift ;
my $fh = $lockfh{$Socket};
@{$ReadBuffer{$Socket}} = () unless (defined($ReadBuffer{$Socket}));
my $locked = 0 ;
my $size = 0 ;
my ( $bigeventcheck , $bigevent ) = ( 1 , 0 );
my ( $sysreadbuffer, $buffer ) = ( "" , "" );
$Socket->blocking(0);
$locked = &TakeComLocking($fh) if ($COM_LOCKING and $maintid);
$! = 0 ;
while (defined($size+=sysread($Socket, $sysreadbuffer, $MAX_BUFFER_SIZE))) {
last unless $size ;
$buffer .= $sysreadbuffer ;
# bigread-event: Handle case we have fullfil our read buffer, we must
# complete with next read so loop to next sysread
# Test modulo bigeventcheck to handle the case we fullfil more than once
last unless ( $size % $MAX_BUFFER_SIZE == $bigeventcheck );
&UPSTAT('BIGREAD-EVENT'.&SelfStatId($self,'full'));
$bigeventcheck = ++ $bigevent ;
$sysreadbuffer = "" ;
}
# Don't stats BUSY ressource
&UPSTAT('errno-'.int($!)."-$!") if ( $! and $! != 11 );
if ($buffer) {
my @lines = split(/\n/,$buffer);
my $count = scalar(@lines) ;
my $check = comTEST ;
# Check buffer could contain a comTEST event as test could involve
# performance issue
if ( $buffer =~ /$check/ ) {
my @test ;
# Check really now there's a formated comTEST
@lines = grep {
@test = &IsCom( comCOM , $_ );
$test[1] !~ /^$check$/
} @lines ;
if (@lines != $count ){
&UPSTAT('GET-COMTEST'.&SelfStatId($self));
&Debug("Received comTEST for " . $self->getName)
if $ADVANCED_DEBUGGING ;
$self->AnswerComDone ;
}
}
# Keep lines in socket dedicated buffer
push @{$ReadBuffer{$Socket}} , @lines ;
# Keep some monitoring scalar
$STATS{'READ-LINES-DUMP'.&SelfStatId($self)} = "@lines" if $bigevent ;
&MAXSTAT('MAX-READ-LINES',scalar(@lines));
&MAXSTAT('MAX-READSIZE',$size);
&MAXSTAT('MAX-READBUFFER-LINES',scalar(@{$ReadBuffer{$Socket}}));
}
&UPSTAT('READSOCK-ERR-'.$!.&SelfStatId($self,'full'))
unless (defined($size));
&ReleaseComLocking($fh) if ( $locked );
&TIMESTAT('ReadSock');
return shift @{$ReadBuffer{$Socket}} ;
}
sub PrintSock {
my $self = shift ;
&UPSTAT('PRINTSOCK-CALL'.&SelfStatId($self));
my $Socket = shift ;
my $locked = 0 ;
# Return printing immediatly unless we should use com locking
unless ($COM_LOCKING and $maintid) {
# maintid is always set after perl is loaded
my $Ret = &RealPrintSock( $self, $Socket, @_ );
return $Ret ;
}
# Only open lockfile one time by process, and keep it opened
my $fh = $lockfh{$Socket} ;
unless (defined($fh)) {
$SocketProc{$Socket} = "$maintid-" . $self->getKind()
unless (defined($SocketProc{$Socket}));
my $lockfile = &GetTmpFile("$SocketProc{$Socket}.comlock");
if (defined($lockfh{$lockfile})) {
$lockfh{$Socket} = $lockfh{$lockfile} ;
} else {
unless ( -e $lockfile ) {
open $lockfh{$Socket} , ">" . $lockfile ;
close($lockfh{$Socket});
}
open $lockfh{$Socket} , "+<" . $lockfile
or delete $lockfh{$Socket},
return &debugdev("Can't open '$lockfile' com lock file: $!");
$lockfh{$lockfile} = $lockfh{$Socket} ;
}
$fh = $lockfh{$Socket} ;
}
$locked = &TakeComLocking($fh);
my $Ret = &RealPrintSock( $self, $Socket, @_ );
&ReleaseComLocking($fh) if ( $locked );
#&TIMESTAT('PrintSock');
return $Ret ;
}
sub Bufferize {
my $self = shift ;
my $Socket = shift ;
return [] unless (defined($Socket));
my $ref = shift ;
my $bufname = 'BUFFER-LINES'.&SelfStatId($self) ;
&MAXSTAT($bufname."-LINES",scalar(@{$self->{COMBUFFER}}));
# Bufferize
if ($self->{URGENTCOM}) {
unshift @{$self->{COMBUFFER}} , @{$ref} ;
return $self->{COMBUFFER} ;
} else {
push @{$self->{COMBUFFER}} , @{$ref} ;
}
# Test if remote buffer may be full to return empty array ref
return &RemoteIsFull($self,$Socket) ? [] : $self->{COMBUFFER} ;
}
sub RemoteIsFull {
my $self = shift ;
return 0 if ($self->{URGENTCOM});
my $Socket = shift ;
return 0 unless (defined($Socket));
my $ref = shift ;
local $" = '' ;
my $len = defined($ref) ? length("@{$ref}") : 0 ;
$len += length(@{$self->{COMBUFFER}}[0]) + 1
if ( defined($self->{COMBUFFER}) and @{$self->{COMBUFFER}} );
$len += $BUFLEN{$Socket} + 4096 ;
if ( $len > $MAX_BUFFER_SIZE ) {
&UPSTAT('REMOTE-FULL-BUFFER-CHECK'.&SelfStatId($self,'full'));
return $len ;
}
return 0 ;
}
sub RemoteBufSizeUpdate {
my $self = shift ;
my $Socket = shift ;
$BUFLEN{$Socket} += shift ;
$STATS{'CUR-BUFFER-CONTROL'.&SelfStatId($self,'full')} = $BUFLEN{$Socket} ;
}
my %ComTest = () ;
sub ResetRemoteBufSize {
my $self = shift ;
my $Socket = shift ;
$Socket = $self->{SON} unless (defined($Socket));
$BUFLEN{$Socket} = 0 ;
delete $ComTest{$Socket} if (defined($ComTest{$Socket}));
}
sub SelfStatId {
my $self = shift ;
return "" unless ( defined($self) and $$ == $maintid );
return '-' . ( @_ ? $self->getName : $self->getKind );
}
sub RetryLater {
my $self = shift ;
my $buffer = shift ;
my $print = shift ;
&UPSTAT('SOCK-RETRYLATER-EVENT'.&SelfStatId($self));
unshift @{$buffer}, @{$print} ;
return scalar(@{$print});
}
sub RealPrintSock {
&TIMESTAT('RealPrintSock');
my $self = shift ;
my $Socket = shift ;
# Below syntax is right but involve a bug
#my $urgent = (defined($self) and exists($self->{URGENTCOM})) ?
# $self->{URGENTCOM} : 0 ;
# This one is the last from a2p v1.020 without the bug
my $urgent = defined($self) and $self->{URGENTCOM} ;
my $count ;
local $" = "\n" ;
my ( $total , $lines , $test ) = ( 0 , 0 , 0 );
my @print = () ;
$! = 0 ;
my $buffer = &Bufferize( $self, $Socket , \@_ );
while ( @{$buffer} or &RemoteIsFull( $self, $Socket ) ) {
if ( @{$buffer} ) {
$lines ++;
my $line = shift @{$buffer} ;
push @print , $line ;
}
$test = &RemoteIsFull( $self, $Socket, \@print );
#$test = $self->{URGENTCOM} unless ( $test or ! defined($self));
if ( ! @{$buffer} or $test or $urgent ) {
my $tries = 0 ;
# Add comTEST to COM
if ( $test ) {
if (! defined($ComTest{$Socket})
or &tv_interval($ComTest{$Socket}) >= 1) {
# Send again comTEST after 1 second
local $" = "' '" ;
my $comtest = &GetCom( comCOM, $$ , comTEST );
$ComTest{$Socket} = [ &gettimeofday() ] ;
&UPSTAT('ADD-COMTEST-EVENT'.&SelfStatId($self));
if ( $test >= $MAX_BUFFER_SIZE ) {
&UPSTAT('KEEP-COMTEST-EVENT'.&SelfStatId($self));
if (@print and ! $urgent) {
my $print = pop @print ;
$lines -- ;
unshift @{$buffer}, $print ;
&debugdev("Retry later '$print'" . ( @print ?
" keeping '@print'":"") .
" to be sure to not fullfill remote buffer" .
" with '$comtest'")
if ( $ADVANCED_DEBUGGING );
}
} elsif ( $ADVANCED_DEBUGGING ) {
&debugdev("Sending '$comtest'" .
(@print?" added to '@print'":""));
}
# Add comtest to print list
push @print, $comtest ;
} elsif (@print) {
&UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self));
# Last comTEST was sent recently, we should better directly
# keep the com and retry it later
$lines -= &RetryLater($self,$buffer,\@print);
$total = -1 unless ($total);
last ;
} else {
&UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self));
# Remote buffer can be full and we have recently sent a
# comTEST and there's nothing to send, so just leave
$total = -1 ;
last ;
}
}
# Wait when socket is busy
my $print = "@print" . $" ;
while (!defined($count = syswrite($Socket, $print))
and $! == EAGAIN ) {
&UPSTAT('BUSY-SOCKET-EVENT'.&SelfStatId($self));
last if ( $tries ++ > 1000 );
usleep $USLEEP ;
last if $MUSTQUIT ;
&Debug("$Socket socket seems busy with errno: $!")
unless ( $tries % 20 );
}
if ( $tries ) {
local $" = ' ' ;
&MAXSTAT('CHARSOCK-RETRY',$tries);
$STATS{'CHARSOCK-RETRY'.&SelfStatId($self)} = "@print" ;
}
if (defined($count)) {
if ($count) {
&RemoteBufSizeUpdate( $self, $Socket, $count );
$total += $count ;
&MAXSTAT('CHARSOCK-COUNT',$count);
&UPSTAT('LINES-SOCK'.&SelfStatId($self,'full'),$lines);
} elsif ($lines) {
&UPSTAT('LINES-SOCK-NOTSENT'.&SelfStatId($self,'full'),
$lines);
$lines -= &RetryLater($self,$buffer,\@print);
$total = -1 unless ($total);
last ;
}
last if $test ;
} elsif ($! == EPIPE) {
close($Socket);
if ($self->{TID}) {
$loggertid = 0 if ($self->getKind eq 'Logger');
&Warn("Communication has been reset with ".$self->getName);
$do_check ++ ;
} elsif ( $Socket == $self->{DADDY} ) {
$loggertid = 0 ;
$MUSTQUIT ++ ;
&Warn("Must quit as my socket to parent has been reset");
}
} else {
local $" = "' '" ;
&Debug("$Socket socket is busy, will retrying '@print' later");
$STATS{'SOCKET-ERROR-'.int($!).&SelfStatId($self)} =
"Can't send '@print'";
$lines -= &RetryLater($self,$buffer,\@print);
$total = -1 unless ($total);
last ;
}
@print = () ;
}
last if $MUSTQUIT ;
}
&TIMESTAT('RealPrintSock');
if ($total) {
&MAXSTAT('MAX-TOTALSOCK'.&SelfStatId($self),$total);
&MAXSTAT('MAX-LINESOCK'.&SelfStatId($self),$lines);
return @{$buffer} ? - @{$buffer} : ( $lines ? $lines : $total );
}
return 0 ;
}
END {
if ( $COM_LOCKING ) {
# Close all locking file handles and clean any locking file
map { close($lockfh{$_}) ; /^$SERVICE_TMP/ and -e $_ and unlink $_ }
keys(%lockfh);
}
}
&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
1;