#
# Copyright (c) 2004-2007 - Consultas, PKG.fr
#
# This file is part of A2P.
#
# A2P is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# A2P is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with A2P; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
#
# $Id: Thread.pm 3 2007-10-18 16:20:19Z guillaume $
#
package A2P::Thread;
use strict ;
use Socket ;
use Errno qw(:POSIX);
use Fcntl ':flock' ;
use IO::Socket ;
use Time::HiRes qw( usleep gettimeofday tv_interval ) ;
use POSIX qw(:signal_h :sys_wait_h setsid) ;
use A2P::Globals ;
use A2P::Syslog  ;
use A2P::threads ;
use A2P::threads qw( threadSleep reduceSleep ) ;
use A2P::Signal  ;
use A2P::Signal  qw( LogSigMessage ) ;
use A2P::Globals qw( UpdateSharedEnv ) ;
use A2P::Com qw( comREQ comCOM comJOB comUPD comDONE comINF comXML comTEST
                 IsCom GetCom  TakeComLocking ReleaseComLocking GetTmpFile );
use A2P::JobStatus 'a2pjobstate' ;
BEGIN {
    our $VERSION = sprintf "%s", q$Rev: 1399 $ =~ /(\d[0-9.]+)\s+/ ;
}
our $VERSION ;
# Hash used for communications
my %ReadBuffer = () ;
my %lockfh     = () ;
my %SocketProc = () ;
my %BUFLEN     = () ;
my $THIS ;
sub new {
    my $class = shift ;
    &Debug("new $class v$VERSION");
    $class =~ /^A2P::(.*)$/ ;
    my $self = {
        KIND       =>  $1,
        NAME       =>  $Progname . '-' . $1 ,
        SIGTERM    =>  0,
        SIGKILL    =>  0,
        TOPING     =>  0,
        LOGGER     =>  0,
        BUSY       =>  [ 0, "" ],
        DO_QUIT    =>  0,
        DO_STAT    =>  [ &gettimeofday() ],
        CanPING    =>  [ &gettimeofday() ],
        HAVE_STATS =>  0,
        STATS      =>  [],
        DONTSLEEP  =>  0,
        URGENTCOM  =>  0,
        COMBUFFER  =>  [],
        JOB        =>  ""
    };
    return bless $self , $class ;
}
sub init {
    my $self = shift ;
    &Debug("new A2P::Thread is " . $self->getName);
    return $self->ThreadError("1, '$self->{KIND}' is not a valid thread kind")
        unless defined($SonKind{$self->{KIND}}) ;
    socketpair( $self->{SON}, $self->{DADDY}, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
        or return $self->ThreadError("2, Can't open socket pair for " .
        $self->{KIND} . " thread: $!");
    bless $self->{SON}   , "IO::Socket" ;
    bless $self->{DADDY} , "IO::Socket" ;
    &Debug($self->getName . " object is ready");
    1 ;
}
our $AbortMessages ;
sub defautABTERMError {
    return () ;
}
sub ABTERM {
    my $self = shift ;
    my ( $package, $filename, $line ) = caller ;
    my $Err  = $_[0] =~ /^\d+$/ ? shift : $line ;
    @AbortTime = &gettimeofday();
    my @ERR = $self->defautABTERMError() ;
    push @ERR, (defined($AbortMessages) and defined($AbortMessages->{$Err}))?
        ( $Err < 100 ?
            $AbortMessages->{$Err}
            :
            sprintf( $AbortMessages->{$Err} , @_ )
        )
        : @_ ;
    &Error( @ERR );
    # Return information to JobManager is JOB defined and not JobManager
    map { $self->Return( $self->{JOB} => &GetCom( comINF, 'ABTERM', $_ )) } @ERR
        if (defined($self->{JOB}) and ! $self->is('JobManager'));
    map { &a2pjobstate( $self->{JOB}, $self->{STEP}, 'A',
            { ABTERM => $_ } ) } @ERR
        if (defined($self->{JOB}) and defined($self->{STEP}));
    &UPSTAT('JOB-ABTERM');
    0 ;
}
sub getBusyReq {
    # Cosmetic function to return debugging info for later debug
    my $self = shift ;
    my @business = @{$self->{BUSY}};
    shift @business ;
    return "@business" ;
}
sub setBusy  {
    my $self = shift ;
    @{$self->{BUSY}} = @_ ;
    &UPSTAT('SET-BUSY-SON') if ( $_[0] );
}
sub isBusyBy {
    my $self = shift ;
    # {BUSY} is initialized to [ 0 , "" ] at object creation
    if ( @_ ) {
        return $self->{BUSY}->[0] == $_[0] ;
    } else {
        return $self->{BUSY}->[0];
    }
}
sub setReady {
    &UPSTAT('SET-READY-SON');
    $_[0]->setBusy( 0, "" );
}
sub self_TEST {
    my $self = shift ;
    $do_test = 0 ;
    our @TT ;
    $STATS{'RUNNING'} = sprintf("%.01fs", &tv_interval(\@TT) - $^T);
    # Output current memorized dynamic statistics
    my @stats = grep { $STATS{$_} } sort(keys(%STATS)) ;
    my $tag = $maintid ? "-$maintid" : "" ;
    if (@stats) {
        open LOG, ">>", "/tmp/$Progname$tag-selftest.log"
            or return &Error("Can't open log file in /tmp for self-test: $!");
        # Don't check anything as this is not critical
        flock(LOG, LOCK_EX);
        foreach my $stat
            ( map { sprintf("%50s %s", $_, $STATS{$_}) } @stats ) {
            print LOG $stat, "\n" ;
        }
        flock(LOG, LOCK_UN);
        close(LOG);
    }
    # Output current memorized log lines
    my @logs = &getBackLogs();
    if ( @logs ) {
        open LOG, ">>", "/tmp/$Progname$tag-selfdebug.log"
            or return &Error("Can't open log file in /tmp for self-debug: $!");
        flock(LOG, LOCK_EX);
        foreach my $debug ( @logs ) {
            print LOG $debug, "\n" ;
        }
        flock(LOG, LOCK_UN);
        close(LOG);
    }
    # Output other things
    foreach my $Array qw( REQUESTS_ARRAY ) {
        if (defined($self->{$Array})) {
            my @Array = @{$self->{$Array}} ;
            if (@Array) {
                my $ii = 0 ;
                my $li = "[%0" . scalar(@{$self->{$Array}}) . "d]: " ;
                open LOG, ">>", "/tmp/$Progname$tag-self$Array.log"
                    or return &Error("Can't open log file in /tmp for " . $Array
                    . " array: $!");
                flock(LOG, LOCK_EX);
                foreach my $val ( @Array ) {
                    $val = ref($val) =~ /^ARRAY/i ? join('',@{$val}) : $val ;
                    print LOG $Array, sprintf($li,$ii++), $val, "\n" ;
                }
                flock(LOG, LOCK_UN);
                close(LOG);
            }
        }
    }
}
sub Delete {
    my $self = shift ;
    &Debug("Deleting T$self->{TID} thread from sons list")
        unless (defined($self->{TOCLEAN}));
    $sons{$self->{TID}} = 0 ;
    delete $sons{$self->{TID}} ;
    undef $self
}
sub ThisError {
    my ( $package, $filename, $line ) = caller ;
    $filename =~ m|([^/]+)$| ;
    &Error($_[0]->getName . ": from $1, L.$line: " . $_[1]);
    0 ;
}
sub commonloopdebug {
    my $self = shift ;
    &Debug("Current BUFLEN:", map { "$_ -> $BUFLEN{$_}" } keys(%BUFLEN));
}
sub loopdebug ;
*loopdebug = \&commonloopdebug ;
sub ThreadError {
    $_[0]->{ERROR} = $_[1] ;
    0 ;
}
sub setToClean {
    $_[0]->{TOCLEAN} = 1 ;
}
sub SwitchLogger {
    # Only called from parent
    return 0 if ( $_[0]->{LOGGER} == $_[1] or ! $_[0]->{TID} );
    &Debug("Switching logger to T$_[1] for " . $_[0]->getName);
    $_[0]->{LOGGER} = $_[1] ;
    $_[0]->AskThread( _INIT , "$_[1]" );
}
sub getName {
    return $_[0]->{NAME} ;
}
sub getKind {
    return $_[0]->{KIND} ;
}
sub is {
    return grep { $_[0]->getKind() eq $_ } @_ ;
}
sub isForked {
    return defined($_[0]->{FORKED}) ? 1 : 0 ;
}
sub isAlive {
    return 0 unless ( $_[0]->{TID} );
    my $test = 0 ;
    $test ++ if (kill 0 , $_[0]->{TID});
    # TODO ContrĂ´ler errno p 371 linux PT
    &Debug( $_[0]->getName . ( $test ? " seems alive" : " seems dead" ) );
    return $test ;
}
sub isToCheck {
    return 0 if (defined($_[0]->{QUITTING}));
    # A thread is to check if it has been check more than 15 seconds ago
    return $_[0]->LastCheck() > 15 ? $_[0]->CanPing() : 0 ;
}
sub KeepStats {
    my $self = shift ;
    return unless ( $self->{HAVE_STATS} and $KEEP_STATS and -d $STATS_FOLDER );
    if (@_) {
        my @time = localtime ;
        push @{$self->{STATS}}, [
            sprintf('%4d%02d%02d', 1900+$time[5], $time[4]+1, $time[3]),
            sprintf('%02d%02d%02d', $time[2], $time[1], $time[0])
            , $self->myStats(\@_) ];
    }
    # Only keep stat after at least 1 second has passed, and there's some
    # But keep output stats if we are quitting
    return unless ( @{$self->{STATS}}
        and ($self->{DO_QUIT} or &tv_interval($self->{DO_STAT}) > 1));
    my %Locked = () ;
    my $File ;
    foreach my $Stat ( @{$self->{STATS}} ) {
        # Compute filename
        my $statfile = $self->getName() . '-' . $Stat->[0] ;
        unless ( $Locked{$statfile} ) {
            open $File, ">>$STATS_FOLDER/$statfile"
                or return $self->ThreadError("Can't open stats file '" .
                    $statfile . "': $!");
            # Try exclusive lock
            my $locked = flock($File, LOCK_EX);
            unless ($locked) {
                close($File);
                return $self->ThreadError("Can't lock stats file '" .
                    $statfile . "': $!");
            }
            if ( defined($self->{STATHEADERS}) and @{$self->{STATHEADERS}} ) {
                # Force again headers output after a restart, in case they
                # have changed
                print $File join(';', @{$self->{STATHEADERS}}), "\n"
                    or return $self->ThreadError("Can't output in file '" .
                        $statfile . "': $!");
                @{$self->{STATHEADERS}} = () ;
            }
            $Locked{$statfile} = $File ;
        }
        $File = $Locked{$statfile} ;
        print $File join(';', @{$Stat}), "\n"
            or return $self->ThreadError("Can't output in statfile '" .
                $statfile . "': $!");
    }
    $self->{STATS} = [ ] ;
    # Unlock and close stat files
    map { flock($Locked{$_}, LOCK_UN) and close($Locked{$_}) } keys(%Locked);
    # Reset stats timer
    $self->{DO_STAT} = [ &gettimeofday() ];
}
sub LastCheck {
    return defined($_[0]->{CHECKED}) ? &tv_interval($_[0]->{CHECKED}) : 100 ;
}
sub Check {
    $_[0]->{TOPING} ++ ;
    # A thread is really busy if it has not say any thing since 60 seconds
    # (see isToCheck())
    &Info($_[0]->getName . " thread seems busy since " .
      sprintf("%.2f s",$_[0]->{TOPING}))
        unless ( $_[0]->{TOPING} and $_[0]->{TOPING} % 4 );
    $_[0]->{CHECKED} = [ &gettimeofday() ];
    $_[0]->{CanPING} = [ &gettimeofday() ];
}
sub Ping {
    $_[0]->AskThread( PING ) if ( $_[0]->{TOPING} and $_[0]->CanPing() );
}
sub GotPing {
    $_[0]->{CHECKED} = [ &gettimeofday() ];
    $_[0]->{TOPING} -- ;
    $_[0]->{CanPING} = [ &gettimeofday() ];
}
sub CanPing {
    # We can repeat a ping only if last ping was done more than 5 seconds ago
    return 0 unless (defined($_[0]->{CanPING}));
    if (&tv_interval($_[0]->{CanPING}) > 5) {
        $_[0]->{CanPING} = [ &gettimeofday() ] ;
        return 1 ;
    }
    0 ;
}
sub SendSigTerm {
    # Send SIGTERM one time after at least 5 seconds
    my $self = shift;
    my $since = $self->StoppingSince();
    return 0 unless ( $since > 5 and $since < 30 and ! $self->{SIGTERM} );
    &Debug("Killing " . $self->getName . " with SIGTERM");
    kill 15, $self->{TID} ;
    return ++ $self->{SIGTERM} ;
}
sub SendSigKill {
    # Send SIGKILL one time after at least 30 seconds
    my $self = shift;
    return 0 if ( $self->{SIGKILL} or $self->StoppingSince() < 30 );
    &Debug("Killing " . $self->getName . " with SIGKILL");
    kill 9, $self->{TID} ;
    return ++ $self->{SIGKILL} ;
}
################################################################################
##          Fork code                                                         ##
################################################################################
sub Fork {
    my $self = shift;
    # Security: Only one fork call can be done by created Thread object
    return undef if ($self->isForked());
    # Classical way to fork
    $self->{TID} = fork ;
    # Return to parent early if not forked
    return undef unless (defined($self->{TID}));
    $self->{FORKED} = [ &gettimeofday() ];
    # Clean SYSLOG socket
    if (defined($SYSLOG)) {
        shutdown($SYSLOG,2);
        $SYSLOG = undef ;
        &Debug("SYSLOG socket is reset");
    }
    # Return to parent
    if ( $self->{TID} ) {
        #============ In parent =============
        &Debug("Parent-to-child link is " . $self->{SON} );
        push @PIPE , $self->{SON} ;
        # Set the filename for COM locking
        $SocketProc{$self->{SON}} = "$maintid-" . $self->getKind()
            if $COM_LOCKING ;
        # Can define logger socket for parent
        &SetLogger( $self ) if ($self->is('Logger'));
        # Update object name as it is used in logging messages
        $self->{NAME} .= '[' . $self->{TID} . ']' ;
        # Return to caller in parent
        return $self->{TID} ;
    }
    #=== In child, we won't leave this object member ===
    # Clean error status
    $? = 0 ;
    $! = 0 ;
    # Clean open COM locking sockets and communications hashes
    if ( $COM_LOCKING ) {
        map { close($lockfh{$_}) } keys(%lockfh);
        %SocketProc = () ;
        %lockfh     = () ;
    }
    %ReadBuffer = () ;
    %BUFLEN     = () ;
    # Clear stats
    map { $STATS{$_} = 0 } keys(%STATS) ;
    # Update immedialty signal handling,
    # Keep in mind these signal handlers won't be called as object member
    $SIG{'USR1'} = \&mySigUSR1 ;
    $SIG{'ALRM'} = \&mySigALRM ;
    $SIG{'TERM'} = \&mySigTERM ;
    $SIG{'QUIT'} = \&mySigTERM ;
    $SIG{'HUP'}  = \&mySigHUP  ;
    $SIG{'ABRT'} = \&mySigTERM ;
    # Update child process name
    $0        = $self->getName ; # Update process name with 'ps' command
    $Progname = $self->getName ; # For logging
    # Reduce priority as parent must be the one with the greater priority
    my $priority = getpriority( 0, $$ );
    if ( $priority < 15 ) {
        # Increase our priority
        setpriority 0, 0, $priority + 5
            or &Warn("Can't update priority: $!");
    }
    $self->ThreadInitEarly();
    &Debug("Child " . $self->getName . " is starting...");
    &Debug("Child-to-parent link is " . $self->{DADDY} );
    push @PIPE , $self->{DADDY} ;
    # Cleaning this process from previously created objects
    map {
        if ( $_ != $$ ) {
            &Debug("Cleaning unused T$_ cloned object during fork");
            $sons{$_}->setToClean() ; # Just to avoid checking from DESTROY sub
            $sons{$_}->Delete();
        }
    } keys(%sons);
    &Debug("Memory cleaned");
    &Debug("Starting " . $self->getName . " T$$");
    $self->ThreadInit();
    my $Ret = $self->ForkedLoop ;
    $loggertid = 0 ; # Force to log directly
    &LogSigMessage();
    &Debug($self->getName . " ended ($MUSTQUIT;$!;$?;$@)");
    &Debug("Forced to quit ($MUSTQUIT)") if $MUSTQUIT ;
    $self->Answer( QUIT );
    $self->KeepStats();
    $self->DoBeforeQuit();
    exit $Ret ;
}
my @bad_answer = () ;
my @KeepRequest = () ;
sub ForkedLoop {
    my $self = $THIS = shift ;
    my $ErrCount = 0 ;
    my $request ;
    my $LoopTiming = [ &gettimeofday() ];
    my $PendingStatus = [ &gettimeofday() ];
    my $LoopCount  = 0 ;
    my $SleepFactor = 1 ;
    $! = 0 ;
    while ( ! ( $MUSTQUIT or $self->{DO_QUIT} ) ) {
        &TIMESTAT('ForkedLoop');
        &LogSigMessage();
        # Check to commit any pending job status at last each minute
        if (&tv_interval($PendingStatus) > 60) {
            &a2pjobstate ;
            $PendingStatus = [ &gettimeofday() ];
        }
        &UPSTAT('LOOPS');
        $self->{DONTSLEEP} ? &reduceSleep('no stats') : &threadSleep() ;
        # Just to flush buffer
        $self->PrintSock($self->{DADDY});
        $self->KeepStats();
        # Test thread sub, can be useful while debugging
        $self->self_TEST() if $do_test ;
        my $max = $COM_BURST ;
        while ( $max-- and
            (defined( $request = $self->GetRequest($self->{DADDY})))) {
            last if ( $MUSTQUIT or $self->{DO_QUIT} );
            chomp $request ;
            my @Req = &IsCom( comREQ , $request );
            if ( @Req == 2 ) {
                if ( $Req[0] == QUIT ) { # Do I have to quit
                    &Debug("Notified to QUIT by parent");
                    $self->{DO_QUIT} = 1 ;
                } elsif ( $Req[0] == PING ) { # Do I have to ping
                    $self->Answer( PING );
                    &Debug("PING");
                } elsif ( $Req[0] == _INIT ) { # ENV update
                    $self->Answer( _INIT );
                    if ( $Req[1] =~ m|^\s*(\$ENV{\w+}\s*=.*)$|i ) {
                        if ( $self->{clientrequest} ) {
                            # SECURITY issue:
                            # Only Listener is concerned: no external init
                            # accepted for now
                            $self->CloseClient( $self->{clientrequest}, 1,
                                "INIT is not supported from other application");
                        } else {
                            &Debug("Do INIT: $1");
                            eval $1 ;
                        }
                    } elsif ( $Req[1] =~ m|^CAN_UPDATE_NOW$|i ) {
                        # Update shared vars with ENV definition
                        &UpdateSharedEnv ;
                        # Reset Debug comportement
                        &ResetDebug() ;
                        $self->InitUpdated();
                    } elsif ( $Req[1] =~ m|^(\d+)$|i ) {
                        if ( $1 == $$ ) {
                            &Debug("Approuved as logger");
                        } elsif ( $self->is('Logger') ) {
                            &Debug("Revoked as logger thread");
                            # We should quit as we are no more used for service
                            $self->{DO_QUIT} = 1 ;
                        } else {
                            &Debug("My logger is T$1") if $1 ;
                            $loggertid = $1 ;
                        }
                    } else {
                        $self->ThisError("Not able to evaluate '" . $Req[1] .
                            "' as ENV update");
                        $ErrCount ++ ;
                    }
                } elsif ( $Req[0] == TODO ) {
                    &Debug("Got 'TODO' COM: '$Req[1]'") if $ADVANCED_DEBUGGING ;
                    $ErrCount ++ unless $self->Do( \$Req[1] );
                    # Called to do our job to we should sleep less next time
                    &reduceSleep();
                } else {
                    $self->ThisError("[DEV] Can't decide request " .
                        (defined($means{$request})?$means{$request}:$request));
                    $ErrCount ++ ;
                }
            # TODO Test me with also $Req[0] == _UPDATE
            } elsif ( @Req = &IsCom( comUPD , $request ) ) {
                if ( $Req[1] =~ m|^(\w+)=(["'])?(.*)\2$|i ) {
                    my ( $var , $value ) = ( $1 , $3 );
                    if (defined($1) and defined($3)) {
                        if ( grep { /$var/ } @SHARED ) {
                            my $apos = $value =~ /^\d+$/ ? '' : '"' ;
                            eval '$' . $var . '=' . $apos . $value . $apos ;
                            # For important thread initialization
                            $self->InitUpdated();
                            &UPSTAT($var.'_DIRECT_UPDATE');
                        } else {
                            $self->ThisError("Not authorized to update '" . $var
                                . "' to '$value'");
                            &UPSTAT('UNAUTH_DIRECT_UPDATE');
                            $ErrCount ++ ;
                        }
                    } else {
                        $self->ThisError("Bad direct update request '$Req[1]'");
                        &UPSTAT('BAD_DIRECT_UPDATE');
                        $ErrCount ++ ;
                    }
                } else {
                    $self->ThisError("Bad formated update request '$Req[1]'");
                    &UPSTAT('BADFORMAT_UPDATE');
                    $ErrCount ++ ;
                }
            } elsif (  @Req = &IsCom( comCOM , $request ) ) {
                if ( $Req[1] == comDONE ) {
                    &UPSTAT('GETCOMDONE');
                    &Debug("Received comDONE from parent")
                        if $ADVANCED_DEBUGGING ;
                    $self->ResetRemoteBufSize($self->{DADDY});
                } else {
                    $self->ThisError("[DEV] Can't handle request '$request'");
                    $ErrCount ++ ;
                }
            # Handle XML request
            } elsif ( @Req = &IsCom( comXML , $request ) ) {
                &Debug("Got an XML request");
                # Check we have XML API and call it
                if (UNIVERSAL::can( $self , 'XML' )) {
                    $self->XML($request);
                } else {
                    $self->ThisError("[DEV] Not expected to handle '" . $request
                        . "' XML request");
                    $ErrCount ++ ;
                }
            } else {
                if ( $request =~ /^[<]/ ) {
                    my $ref ;
                    &UPSTAT('COM-CHECK-FRAGMENT');
                    # Control this com is still not handled
                    if (($ref) = grep { $_->[0] eq $request } @bad_answer ) {
                        # Don't keep too much time
                        next if (++ $ref->[1] < 100);
                    } else {
                        # Will try to reassemble a fragmented com
                        push @bad_answer, [ $request, 0 ] ;
                        $max ++ ;
                        next ;
                    }
                } elsif (@bad_answer) {
                    # Try to assemble and reinject immediatly
                    my $try = shift @bad_answer ;
                    my $com = $try->[0] . $request ;
                    &Info("Trying reassembled com: $com");
                    push @KeepRequest, $com ;
                    &UPSTAT('COM-REASSEMBLED');
                    $max ++ ;
                    next ;
                }
                # This only critical if not in Logger
                if ($self->getKind() eq 'Logger') {
                    &Warn("Critical error on " . $self->getName);
                } else {
                    &Alert("Critical error on " . $self->getName);
                }
                $self->ThisError("[DEV] Can't interpret request '$request'");
                $ErrCount ++ ;
            }
        }# Loop COM_BURST
        # Check parent is alive
        unless ( kill 0, $maintid ) {
            # TODO ContrĂ´ler errno p 371 de linux pt
            $loggertid = 0 ;
            &Warn("Quitting as parent seems dead");
            $MUSTQUIT = 1 ;
        }
        { # Rating the do loop
            no integer ;
            $LoopCount += 100 ;
            my $int = &tv_interval($LoopTiming);
            if ( $int >= 1 ) {
                $LoopCount = 0 ;
                $LoopTiming = [ &gettimeofday() ];
                &MAXSTAT('DOLOOP-RATE',int($LoopCount/$int)/100);
            }
        }
        if ( $ADVANCED_DEBUGGING ) {
            &Debug("Loop rate is " . $STATS{'DOLOOP-RATE'} );
            $self->loopdebug();
        }
        &TIMESTAT('ForkedLoop');
    }# Loop to read queue
    undef $THIS ;
    &Debug("$ErrCount counted errors") if $ErrCount ;
    return $ErrCount ? 1 : 0 ;
}
################################################################################
##          End Fork code                                                     ##
################################################################################
sub ThreadInit {
    1 ; # Nothing to do by default
}
sub ThreadInitEarly {
    # Directly link Log answers to parent (defined in Syslog.pm)
    &SetLogger( $_[0] );
    # This member must be overrided in Logger Object
}
sub InitUpdated {
    1 ; # Nothing to do by default
}
sub GetRequest {
    my $self     = shift ;
    my $Requests = shift ;
    # Check if we got a fragmented not safe request
    return shift @KeepRequest if @KeepRequest ;
    return ( $MUSTQUIT or $self->{DO_QUIT} )? 1 : $self->ReadSock($Requests) ;
}
sub Do {
    my $self = shift ;
    my $ref  = shift ;
    &Debug("Got to do '$$ref'");
}
sub DoBeforeQuit {
    1 ; # Nothing to do by default
}
sub mySigUSR1 {
    push @SigDebug, "SIGUSR1 received in " . __PACKAGE__ ;
    $do_test ++ ;
}
sub mySigALRM {
    return unless ($SCAN_SPOOL);
    $do_file ++ if ( $do_file < $MAXTASK and $SCAN_SPOOL );
    push @SigDebug, "SIGALRM received (do_file=$do_file) in " . __PACKAGE__ ;
}
sub mySigTERM {
    my $ref = $_[0] eq 'ABRT' ? \@SigWarn : \@SigDebug ;
    push @{$ref}, "SIG$_[0] received in " . __PACKAGE__ ;
    $MUSTQUIT ++ ;
    $NO_SYSLOG_DEBUG = 0 if $SERVICE_DEBUG ;
}
sub mySigHUP {
    push @SigDebug, "SIGHUP received in " . __PACKAGE__ ;
}
sub mySigPIPE {
    my ($package, $filename, $line) = caller();
    push @SigDebug , "SIG$_[0] received at L. $line in $package: $! $?" ;
    $MUSTQUIT ++ unless ( grep { defined($_) } @PIPE );
}
sub myStats {
    my $self = shift ;
    return @{$_[0]} ;
}
sub AskThread {
    my $self   = shift ;
    my $Asking = $self->{SON} ;
    if (@_ > 10) {
        return &Error("[DEV] Too large argument for AskThread: @_");
    }
    my $msg = &GetCom( comREQ , @_ );
    {
        my $len = length($msg);
        &MAXSTAT('REQUEST-LEN', $len);
        if ( $len < 50 ) {       &UPSTAT('REQUEST-LEN-000-049');
        } elsif ( $len < 100 ) { &UPSTAT('REQUEST-LEN-050-099');
        } elsif ( $len < 200 ) { &UPSTAT('REQUEST-LEN-100-199');
        } elsif ( $len < 500 ) { &UPSTAT('REQUEST-LEN-200-499');
        } elsif ( $len < 1000) { &UPSTAT('REQUEST-LEN-500-999');
        } else {
            &UPSTAT('REQUEST-LEN-1000+');
            if ( $len > 9999 ) {
                open LOG, ">" . &GetTmpFile("$Progname-lastbigreq.log")
                    or return &Error("Can't open big req log file in TMP: $!");
                print LOG $msg, "\n" ;
                close(LOG);
            }
        }
    }
    &UPSTAT("SENDLINES-".$self->{KIND});
    return $self->PrintSock( $Asking, $msg ) ;
}
sub AnswerDone {
    $_[0]->Answer( &GetCom( comJOB , $_[1] , &GetCom( comDONE )));
}
sub AnswerComDone {
    my $self   = shift ;
    my $socket = $self->{SON} ;
    $socket = $self->{DADDY} unless ( $$ == $maintid );
    $self->{URGENTCOM} = 1 ;
    if (defined($socket)) {
        $! = 0 ;
        $self->ThisError("Can't answer comDONE".(($!)?": $!":""))
            unless ($self->PrintSock($socket,&GetCom( comCOM , $$ , comDONE )));
    } else {
        $self->ThisError("Can't answer comDONE: socket not available");
    }
    $self->{URGENTCOM} = 0 ;
}
sub Return {
    my $self = shift ;
    $self->Answer( &GetCom( comREQ , JobManager => &GetCom( comJOB , @_ )));
}
sub Answer {
    my $self    = shift ;
    my $Answers = $self->{DADDY} ;
    return &UPSTAT('Answer-ERROR') unless ( defined($Answers) and @_ );
    my $msg = &GetCom( comCOM, $$ => ( @_ > 1 ? &GetCom( comJOB, @_ ): $_[0] ));
    {
        my $len = length($msg);
        &MAXSTAT('Answer-MAXLEN',$len);
        if ( $len < 50 ) {       &UPSTAT('Answer-LEN-000-049');
        } elsif ( $len < 100 ) { &UPSTAT('Answer-LEN-050-099');
        } elsif ( $len < 200 ) { &UPSTAT('Answer-LEN-100-199');
        } elsif ( $len < 500 ) { &UPSTAT('Answer-LEN-200-499');
        } elsif ( $len < 1000) { &UPSTAT('Answer-LEN-500-999');
        } else {                 &UPSTAT('Answer-LEN-1000+'); }
        if ( $len > $MAX_BUFFER_SIZE - 4096 ) {
            &UPSTAT("BADCOM-LEN-$len");
            open LOG, ">>" . &GetTmpFile("$Progname-$maintid-badcom.log")
                or return &Error("Can't open badcom log file in TMP: $!");
            print LOG $msg, "\n" ;
            close(LOG);
            return &Error("Won't send too long ($len) com");
        }
    }
    unless ($self->PrintSock($Answers, $msg )) {
        &Debug("Answer not sent, ".__PACKAGE__.", l. ".__LINE__." ($!-$?-$@)");
        return 0 ;
    }
}
sub Request {
    my $self = shift;
    return unless @_ ;
    $self->Answer( &GetCom( comREQ , @_ ) );
}
sub getAnswers { # Should only be used by parent thread
    my $self = shift;
    my $max  = $COM_BURST ;
    my @mesg = () ;
    while ( $max-- and my $msg = $self->ReadSock($self->{SON}) ) {
        next unless (defined($msg));
        chomp $msg ;
        next unless $msg ; # Skip empty lines
        {
            my $len = length($msg);
            &MAXSTAT('ANSWER-MAXLEN',$len);
            if ( $len < 50 ) {       &UPSTAT('ANSWER-LEN-000-049');
            } elsif ( $len < 100 ) { &UPSTAT('ANSWER-LEN-050-099');
            } elsif ( $len < 200 ) { &UPSTAT('ANSWER-LEN-100-199');
            } elsif ( $len < 500 ) { &UPSTAT('ANSWER-LEN-200-499');
            } elsif ( $len < 1000) { &UPSTAT('ANSWER-LEN-500-999');
            } else {                 &UPSTAT('ANSWER-LEN-1000+'); }
        }
        push @mesg, $msg ;
    }
    # Update read line counter
    &UPSTAT("READLINES-".$self->{KIND},scalar(@mesg));
    return @mesg ;
}
sub AskToStop {
    my $self = shift;
    return 1 if ($self->StoppingSince());
    $self->{QUITTING} = [ &gettimeofday() ];
    while ( $self->AskThread( QUIT ) < 0 ) {
        usleep $USLEEP ;
        unless ($self->StoppingSince() < 0.5) {
            &Info("Can't ask to quit: $! - $? - $@");
            return 0;
        }
    }
    1 ;
}
sub AskInit {
    return $_[0]->ThreadError("1, can't ask to init '$_[1]'")
        unless ($_[0]->AskThread( _INIT , "$_[1]" ));
    $_[0]->{_INIT} ++ ;
}
sub GotInit {
    $_[0]->{_INIT} -- ;
    &Debug("Thread " . $_[0]->getName . " ENV updated")
        unless ( $_[0]->{_INIT} );
}
sub StoppingSince {
    return defined($_[0]->{QUITTING}) ? &tv_interval( $_[0]->{QUITTING} ) : 0 ;
}
sub DoLog {
    my $self = shift ;
    &UPSTAT('DOLOG');
    my $good = 0 ;
    if ( $$ == $maintid ) {
        if ($self->is('Logger')) {
            $good = $self->PrintSock($self->{SON}, &GetCom( comREQ, TODO, @_ ));
        } else {
            &debugdev("Not expected to handle '@_' here");
            &UPSTAT('DOLOG-ERROR');
        }
    } else {
        $good = $self->PrintSock($self->{DADDY}, &GetCom( comCOM , $$ ,  @_ ));
    }
    unless ($good) {
        &UPSTAT('BAD-DOLOG');
        return 0 ;
    }
}
sub END {
    my $self = $THIS;
    if (defined($self)) {
        &Debug("Thread T$$ was " . ($self->{DO_QUIT}?"quitting ":"") .
            "on job $self->{JOB}")
                if $self->{JOB} ;
        # Return any recent Sig message when thread seems aborting unexpectedly
        if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager')
            and $self->{JOB} and defined($self->{DADDY})
            and (@SigWarn or @SigInfo))
        {
            # Try to advertize JobManager
            map { $self->Return(
                $self->{JOB} => &GetCom( comINF, Warning => $_ )) } @SigWarn ;
            map { $self->Return(
                $self->{JOB} => &GetCom( comINF, Info => $_ )) } @SigInfo ;
            map { $self->Return(
                $self->{JOB} => &GetCom( comINF, Debug => $_ )) } @SigDebug ;
            $self->Return( $self->{JOB} , 255 );
            $self->AnswerDone( $self->{JOB} );
            $self->{JOB} = "" ;
        }
        # Update loggertid to log directly
        &LogSigMessage();
    }
}
sub PreDESTROY {}
sub DESTROY {
    my $self = shift;
    $self->PreDESTROY();
    if (!defined($self->{TOCLEAN})) {
        # Return any recent Sig message when thread seems aborting unexpectedly
        if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager')
            and ( $self->{JOB} or !defined($self->{DADDY}) )
            and ( @SigWarn or @SigInfo ) )
        {
            &Alert("T$$ seems aborted abnormally".(defined($self->{DADDY})?"":
                " with socket to parent lost"), @SigWarn, @SigInfo, @SigDebug );
        }
        # Update loggertid to log directly
        &LogSigMessage();
        # Debug age
        &Debug(sprintf("%s Age = %.2f s", $self->{NAME},
            &tv_interval( $self->{FORKED} )))
                if ($self->isForked());
        # Must kill forked thread if alive
        if ( $self->isForked() and $self->isAlive() ) {
            &Debug("Killing thread object $self->{NAME}");
            kill 9, $self->{TID} ;
        }
        if (defined($self->{ERROR})) {
            &Error("Thread type object $self->{NAME} destroyed with error #" .
                $self->{ERROR});
        } else {
            &Debug("Thread object $self->{NAME} destroyed");
        }
        &Debug("Thread object $self->{NAME} status: $! - $? - $@")
            if ( $! or $? or $@ );
    }
    map {
        shutdown( $_, 2 )
    } grep {
        defined($_) and (! $self->{TID} or $$ == $maintid)
    }
        ( $self->{SON} , $self->{DADDY} );
    &Debug(__PACKAGE__." T".($self->{TID}?$self->{TID}:$$)." memory cleaned");
}
############################### Dedicated socket communication members
sub ReadSock {
    &TIMESTAT('ReadSock');
    my $self = shift ;
    my $Socket = shift ;
    my $fh = $lockfh{$Socket};
    @{$ReadBuffer{$Socket}} = () unless (defined($ReadBuffer{$Socket}));
    my $locked = 0 ;
    my $size = 0 ;
    my ( $bigeventcheck , $bigevent ) = ( 1 , 0 );
    my ( $sysreadbuffer, $buffer ) = ( "" , "" );
    $Socket->blocking(0);
    $locked = &TakeComLocking($fh) if ($COM_LOCKING and $maintid);
    $! = 0 ;
    while (defined($size+=sysread($Socket, $sysreadbuffer, $MAX_BUFFER_SIZE))) {
        last unless $size ;
        $buffer .= $sysreadbuffer ;
        # bigread-event: Handle case we have fullfil our read buffer, we must
        # complete with next read so loop to next sysread
        # Test modulo bigeventcheck to handle the case we fullfil more than once
        last unless ( $size % $MAX_BUFFER_SIZE == $bigeventcheck );
        &UPSTAT('BIGREAD-EVENT'.&SelfStatId($self,'full'));
        $bigeventcheck = ++ $bigevent ;
        $sysreadbuffer = "" ;
    }
    # Don't stats BUSY ressource
    &UPSTAT('errno-'.int($!)."-$!") if ( $! and $! != 11 );
    if ($buffer) {
        my @lines = split(/\n/,$buffer);
        my $count = scalar(@lines) ;
        my $check = comTEST ;
        # Check buffer could contain a comTEST event as test could involve
        # performance issue
        if ( $buffer =~ /$check/ ) {
            my @test ;
            # Check really now there's a formated comTEST
            @lines = grep {
                @test = &IsCom( comCOM , $_ );
                $test[1] !~ /^$check$/
                } @lines ;
            if (@lines != $count ){
                &UPSTAT('GET-COMTEST'.&SelfStatId($self));
                &Debug("Received comTEST for " . $self->getName)
                    if $ADVANCED_DEBUGGING ;
                $self->AnswerComDone ;
            }
        }
        # Keep lines in socket dedicated buffer
        push @{$ReadBuffer{$Socket}} , @lines ;
        # Keep some monitoring scalar
        $STATS{'READ-LINES-DUMP'.&SelfStatId($self)} = "@lines" if $bigevent ;
        &MAXSTAT('MAX-READ-LINES',scalar(@lines));
        &MAXSTAT('MAX-READSIZE',$size);
        &MAXSTAT('MAX-READBUFFER-LINES',scalar(@{$ReadBuffer{$Socket}}));
    }
    &UPSTAT('READSOCK-ERR-'.$!.&SelfStatId($self,'full'))
        unless (defined($size));
    &ReleaseComLocking($fh) if ( $locked );
    &TIMESTAT('ReadSock');
    return shift @{$ReadBuffer{$Socket}} ;
}
sub PrintSock {
    my $self = shift ;
    &UPSTAT('PRINTSOCK-CALL'.&SelfStatId($self));
    my $Socket = shift ;
    my $locked = 0 ;
    # Return printing immediatly unless we should use com locking
    unless ($COM_LOCKING and $maintid) {
        # maintid is always set after perl is loaded
        my $Ret = &RealPrintSock( $self, $Socket, @_ );
        return $Ret ;
    }
    # Only open lockfile one time by process, and keep it opened
    my $fh = $lockfh{$Socket} ;
    unless (defined($fh)) {
        $SocketProc{$Socket} = "$maintid-" . $self->getKind()
            unless (defined($SocketProc{$Socket}));
        my $lockfile = &GetTmpFile("$SocketProc{$Socket}.comlock");
        if (defined($lockfh{$lockfile})) {
            $lockfh{$Socket} = $lockfh{$lockfile} ;
        } else {
            unless ( -e $lockfile ) {
                open $lockfh{$Socket} , ">" . $lockfile ;
                close($lockfh{$Socket});
            }
            open $lockfh{$Socket} , "+<" . $lockfile
                or delete $lockfh{$Socket},
                return &debugdev("Can't open '$lockfile' com lock file: $!");
            $lockfh{$lockfile} = $lockfh{$Socket} ;
        }
        $fh = $lockfh{$Socket} ;
    }
    $locked = &TakeComLocking($fh);
    my $Ret = &RealPrintSock( $self, $Socket, @_ );
    &ReleaseComLocking($fh) if ( $locked );
    #&TIMESTAT('PrintSock');
    return $Ret ;
}
sub Bufferize {
    my $self = shift ;
    my $Socket = shift ;
    return [] unless (defined($Socket));
    my $ref = shift ;
    my $bufname = 'BUFFER-LINES'.&SelfStatId($self) ;
    &MAXSTAT($bufname."-LINES",scalar(@{$self->{COMBUFFER}}));
    # Bufferize
    if ($self->{URGENTCOM}) {
        unshift @{$self->{COMBUFFER}} , @{$ref} ;
        return $self->{COMBUFFER} ;
    } else {
        push @{$self->{COMBUFFER}} , @{$ref} ;
    }
    # Test if remote buffer may be full to return empty array ref
    return &RemoteIsFull($self,$Socket) ? [] : $self->{COMBUFFER} ;
}
sub RemoteIsFull {
    my $self = shift ;
    return 0 if ($self->{URGENTCOM});
    my $Socket = shift ;
    return 0 unless (defined($Socket));
    my $ref = shift ;
    local $" = '' ;
    my $len = defined($ref) ? length("@{$ref}") : 0 ;
    $len += length(@{$self->{COMBUFFER}}[0]) + 1
        if ( defined($self->{COMBUFFER}) and @{$self->{COMBUFFER}} );
    $len += $BUFLEN{$Socket}  + 4096 ;
    if ( $len > $MAX_BUFFER_SIZE ) {
        &UPSTAT('REMOTE-FULL-BUFFER-CHECK'.&SelfStatId($self,'full'));
        return $len ;
    }
    return 0 ;
}
sub RemoteBufSizeUpdate {
    my $self = shift ;
    my $Socket = shift ;
    $BUFLEN{$Socket} += shift ;
    $STATS{'CUR-BUFFER-CONTROL'.&SelfStatId($self,'full')} = $BUFLEN{$Socket} ;
}
my %ComTest = () ;
sub ResetRemoteBufSize {
    my $self = shift ;
    my $Socket = shift ;
    $Socket = $self->{SON} unless (defined($Socket));
    $BUFLEN{$Socket} = 0 ;
    delete $ComTest{$Socket} if (defined($ComTest{$Socket}));
}
sub SelfStatId {
    my $self = shift ;
    return "" unless ( defined($self) and $$ == $maintid );
    return '-' . ( @_ ? $self->getName : $self->getKind );
}
sub RetryLater {
    my $self   = shift ;
    my $buffer = shift ;
    my $print  = shift ;
    &UPSTAT('SOCK-RETRYLATER-EVENT'.&SelfStatId($self));
    unshift @{$buffer}, @{$print} ;
    return scalar(@{$print});
}
sub RealPrintSock {
    &TIMESTAT('RealPrintSock');
    my $self = shift ;
    my $Socket = shift ;
    # Below syntax is right but involve a bug
    #my $urgent = (defined($self) and exists($self->{URGENTCOM})) ?
    #    $self->{URGENTCOM} : 0 ;
    # This one is the last from a2p v1.020 without the bug
    my $urgent = defined($self) and $self->{URGENTCOM} ;
    my $count ;
    local $" = "\n" ;
    my ( $total , $lines , $test ) = ( 0 , 0 , 0 );
    my @print = () ;
    $! = 0 ;
    my $buffer = &Bufferize( $self, $Socket , \@_ );
    while ( @{$buffer} or &RemoteIsFull( $self, $Socket ) ) {
        if ( @{$buffer} ) {
            $lines ++;
            my $line = shift @{$buffer} ;
            push @print , $line ;
        }
        $test = &RemoteIsFull( $self, $Socket, \@print );
        #$test = $self->{URGENTCOM} unless ( $test or ! defined($self));
        if ( ! @{$buffer} or $test or $urgent ) {
            my $tries = 0 ;
            # Add comTEST to COM
            if ( $test ) {
                if (! defined($ComTest{$Socket})
                or &tv_interval($ComTest{$Socket}) >= 1) {
                    # Send again comTEST after 1 second
                    local $" = "' '" ;
                    my $comtest = &GetCom( comCOM, $$ , comTEST );
                    $ComTest{$Socket} = [ &gettimeofday() ] ;
                    &UPSTAT('ADD-COMTEST-EVENT'.&SelfStatId($self));
                    if ( $test >= $MAX_BUFFER_SIZE ) {
                        &UPSTAT('KEEP-COMTEST-EVENT'.&SelfStatId($self));
                        if (@print and ! $urgent) {
                            my $print = pop @print ;
                            $lines -- ;
                            unshift @{$buffer}, $print ;
                            &debugdev("Retry later '$print'" . ( @print ?
                                " keeping '@print'":"") .
                                " to be sure to not fullfill remote buffer" .
                                " with '$comtest'")
                                if ( $ADVANCED_DEBUGGING );
                        }
                    } elsif ( $ADVANCED_DEBUGGING ) {
                        &debugdev("Sending '$comtest'" .
                            (@print?" added to '@print'":""));
                    }
                    # Add comtest to print list
                    push @print, $comtest ;
                } elsif (@print) {
                    &UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self));
                    # Last comTEST was sent recently, we should better directly
                    # keep the com and retry it later
                    $lines -= &RetryLater($self,$buffer,\@print);
                    $total = -1 unless ($total);
                    last ;
                } else {
                    &UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self));
                    # Remote buffer can be full and we have recently sent a
                    # comTEST and there's nothing to send, so just leave
                    $total = -1 ;
                    last ;
                }
            }
            # Wait when socket is busy
            my $print = "@print" . $" ;
            while (!defined($count = syswrite($Socket, $print))
            and $! == EAGAIN ) {
                &UPSTAT('BUSY-SOCKET-EVENT'.&SelfStatId($self));
                last if ( $tries ++ > 1000 );
                usleep $USLEEP ;
                last if $MUSTQUIT ;
                &Debug("$Socket socket seems busy with errno: $!")
                    unless ( $tries % 20 );
            }
            if ( $tries ) {
                local $" = ' ' ;
                &MAXSTAT('CHARSOCK-RETRY',$tries);
                $STATS{'CHARSOCK-RETRY'.&SelfStatId($self)} = "@print" ;
            }
            if (defined($count)) {
                if ($count) {
                    &RemoteBufSizeUpdate( $self, $Socket, $count );
                    $total += $count ;
                    &MAXSTAT('CHARSOCK-COUNT',$count);
                    &UPSTAT('LINES-SOCK'.&SelfStatId($self,'full'),$lines);
                } elsif ($lines) {
                    &UPSTAT('LINES-SOCK-NOTSENT'.&SelfStatId($self,'full'),
                        $lines);
                    $lines -= &RetryLater($self,$buffer,\@print);
                    $total = -1 unless ($total);
                    last ;
                }
                last if $test ;
            } elsif ($! == EPIPE) {
                close($Socket);
                if ($self->{TID}) {
                    $loggertid = 0 if ($self->getKind eq 'Logger');
                    &Warn("Communication has been reset with ".$self->getName);
                    $do_check ++ ;
                } elsif ( $Socket == $self->{DADDY} ) {
                    $loggertid = 0 ;
                    $MUSTQUIT ++ ;
                    &Warn("Must quit as my socket to parent has been reset");
                }
            } else {
                local $" = "' '" ;
                &Debug("$Socket socket is busy, will retrying '@print' later");
                $STATS{'SOCKET-ERROR-'.int($!).&SelfStatId($self)} =
                    "Can't send '@print'";
                $lines -= &RetryLater($self,$buffer,\@print);
                $total = -1 unless ($total);
                last ;
            }
            @print = () ;
        }
        last if $MUSTQUIT ;
    }
    &TIMESTAT('RealPrintSock');
    if ($total) {
        &MAXSTAT('MAX-TOTALSOCK'.&SelfStatId($self),$total);
        &MAXSTAT('MAX-LINESOCK'.&SelfStatId($self),$lines);
        return @{$buffer} ? - @{$buffer} : ( $lines ? $lines : $total );
    }
    return 0 ;
}
END {
    if ( $COM_LOCKING ) {
        # Close all locking file handles and clean any locking file
        map { close($lockfh{$_}) ; /^$SERVICE_TMP/ and -e $_ and unlink $_ }
            keys(%lockfh);
    }
}
&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
1;