source: A2P/a2p/A2P/Thread.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 44.3 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: Thread.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22
23package A2P::Thread;
24
25use strict ;
26use Socket ;
27use Errno qw(:POSIX);
28use Fcntl ':flock' ;
29use IO::Socket ;
30use Time::HiRes qw( usleep gettimeofday tv_interval ) ;
31use POSIX qw(:signal_h :sys_wait_h setsid) ;
32use A2P::Globals ;
33use A2P::Syslog  ;
34use A2P::threads ;
35use A2P::threads qw( threadSleep reduceSleep ) ;
36use A2P::Signal  ;
37use A2P::Signal  qw( LogSigMessage ) ;
38use A2P::Globals qw( UpdateSharedEnv ) ;
39use A2P::Com qw( comREQ comCOM comJOB comUPD comDONE comINF comXML comTEST
40                 IsCom GetCom  TakeComLocking ReleaseComLocking GetTmpFile );
41use A2P::JobStatus 'a2pjobstate' ;
42
43BEGIN {
44    our $VERSION = sprintf "%s", q$Rev: 1399 $ =~ /(\d[0-9.]+)\s+/ ;
45}
46our $VERSION ;
47
48# Hash used for communications
49my %ReadBuffer = () ;
50my %lockfh     = () ;
51my %SocketProc = () ;
52my %BUFLEN     = () ;
53my $THIS ;
54
55sub new {
56    my $class = shift ;
57
58    &Debug("new $class v$VERSION");
59
60    $class =~ /^A2P::(.*)$/ ;
61    my $self = {
62        KIND       =>  $1,
63        NAME       =>  $Progname . '-' . $1 ,
64        SIGTERM    =>  0,
65        SIGKILL    =>  0,
66        TOPING     =>  0,
67        LOGGER     =>  0,
68        BUSY       =>  [ 0, "" ],
69        DO_QUIT    =>  0,
70        DO_STAT    =>  [ &gettimeofday() ],
71        CanPING    =>  [ &gettimeofday() ],
72        HAVE_STATS =>  0,
73        STATS      =>  [],
74        DONTSLEEP  =>  0,
75        URGENTCOM  =>  0,
76        COMBUFFER  =>  [],
77        JOB        =>  ""
78    };
79
80    return bless $self , $class ;
81}
82
83sub init {
84    my $self = shift ;
85    &Debug("new A2P::Thread is " . $self->getName);
86
87    return $self->ThreadError("1, '$self->{KIND}' is not a valid thread kind")
88        unless defined($SonKind{$self->{KIND}}) ;
89
90    socketpair( $self->{SON}, $self->{DADDY}, AF_UNIX, SOCK_STREAM, PF_UNSPEC )
91        or return $self->ThreadError("2, Can't open socket pair for " .
92        $self->{KIND} . " thread: $!");
93
94    bless $self->{SON}   , "IO::Socket" ;
95    bless $self->{DADDY} , "IO::Socket" ;
96
97    &Debug($self->getName . " object is ready");
98    1 ;
99}
100
101our $AbortMessages ;
102
103sub defautABTERMError {
104    return () ;
105}
106
107sub ABTERM {
108    my $self = shift ;
109    my ( $package, $filename, $line ) = caller ;
110    my $Err  = $_[0] =~ /^\d+$/ ? shift : $line ;
111
112    @AbortTime = &gettimeofday();
113
114    my @ERR = $self->defautABTERMError() ;
115
116    push @ERR, (defined($AbortMessages) and defined($AbortMessages->{$Err}))?
117        ( $Err < 100 ?
118            $AbortMessages->{$Err}
119            :
120            sprintf( $AbortMessages->{$Err} , @_ )
121        )
122        : @_ ;
123
124    &Error( @ERR );
125
126    # Return information to JobManager is JOB defined and not JobManager
127    map { $self->Return( $self->{JOB} => &GetCom( comINF, 'ABTERM', $_ )) } @ERR
128        if (defined($self->{JOB}) and ! $self->is('JobManager'));
129
130    map { &a2pjobstate( $self->{JOB}, $self->{STEP}, 'A',
131            { ABTERM => $_ } ) } @ERR
132        if (defined($self->{JOB}) and defined($self->{STEP}));
133
134    &UPSTAT('JOB-ABTERM');
135    0 ;
136}
137
138sub getBusyReq {
139    # Cosmetic function to return debugging info for later debug
140    my $self = shift ;
141    my @business = @{$self->{BUSY}};
142    shift @business ;
143    return "@business" ;
144}
145
146sub setBusy  {
147    my $self = shift ;
148    @{$self->{BUSY}} = @_ ;
149    &UPSTAT('SET-BUSY-SON') if ( $_[0] );
150}
151
152sub isBusyBy {
153    my $self = shift ;
154    # {BUSY} is initialized to [ 0 , "" ] at object creation
155    if ( @_ ) {
156        return $self->{BUSY}->[0] == $_[0] ;
157    } else {
158        return $self->{BUSY}->[0];
159    }
160}
161
162sub setReady {
163    &UPSTAT('SET-READY-SON');
164    $_[0]->setBusy( 0, "" );
165}
166
167sub self_TEST {
168    my $self = shift ;
169    $do_test = 0 ;
170
171    our @TT ;
172    $STATS{'RUNNING'} = sprintf("%.01fs", &tv_interval(\@TT) - $^T);
173
174    # Output current memorized dynamic statistics
175    my @stats = grep { $STATS{$_} } sort(keys(%STATS)) ;
176    my $tag = $maintid ? "-$maintid" : "" ;
177    if (@stats) {
178        open LOG, ">>", "/tmp/$Progname$tag-selftest.log"
179            or return &Error("Can't open log file in /tmp for self-test: $!");
180
181        # Don't check anything as this is not critical
182        flock(LOG, LOCK_EX);
183
184        foreach my $stat
185            ( map { sprintf("%50s %s", $_, $STATS{$_}) } @stats ) {
186            print LOG $stat, "\n" ;
187        }
188        flock(LOG, LOCK_UN);
189        close(LOG);
190    }
191
192    # Output current memorized log lines
193    my @logs = &getBackLogs();
194    if ( @logs ) {
195        open LOG, ">>", "/tmp/$Progname$tag-selfdebug.log"
196            or return &Error("Can't open log file in /tmp for self-debug: $!");
197
198        flock(LOG, LOCK_EX);
199
200        foreach my $debug ( @logs ) {
201            print LOG $debug, "\n" ;
202        }
203        flock(LOG, LOCK_UN);
204        close(LOG);
205    }
206
207    # Output other things
208    foreach my $Array qw( REQUESTS_ARRAY ) {
209        if (defined($self->{$Array})) {
210            my @Array = @{$self->{$Array}} ;
211            if (@Array) {
212                my $ii = 0 ;
213                my $li = "[%0" . scalar(@{$self->{$Array}}) . "d]: " ;
214
215                open LOG, ">>", "/tmp/$Progname$tag-self$Array.log"
216                    or return &Error("Can't open log file in /tmp for " . $Array
217                    . " array: $!");
218
219                flock(LOG, LOCK_EX);
220
221                foreach my $val ( @Array ) {
222                    $val = ref($val) =~ /^ARRAY/i ? join('',@{$val}) : $val ;
223                    print LOG $Array, sprintf($li,$ii++), $val, "\n" ;
224                }
225                flock(LOG, LOCK_UN);
226                close(LOG);
227            }
228        }
229    }
230}
231
232sub Delete {
233    my $self = shift ;
234    &Debug("Deleting T$self->{TID} thread from sons list")
235        unless (defined($self->{TOCLEAN}));
236    $sons{$self->{TID}} = 0 ;
237    delete $sons{$self->{TID}} ;
238    undef $self
239}
240
241sub ThisError {
242    my ( $package, $filename, $line ) = caller ;
243    $filename =~ m|([^/]+)$| ;
244    &Error($_[0]->getName . ": from $1, L.$line: " . $_[1]);
245    0 ;
246}
247
248sub commonloopdebug {
249    my $self = shift ;
250    &Debug("Current BUFLEN:", map { "$_ -> $BUFLEN{$_}" } keys(%BUFLEN));
251}
252
253sub loopdebug ;
254*loopdebug = \&commonloopdebug ;
255
256sub ThreadError {
257    $_[0]->{ERROR} = $_[1] ;
258    0 ;
259}
260
261sub setToClean {
262    $_[0]->{TOCLEAN} = 1 ;
263}
264
265sub SwitchLogger {
266    # Only called from parent
267    return 0 if ( $_[0]->{LOGGER} == $_[1] or ! $_[0]->{TID} );
268    &Debug("Switching logger to T$_[1] for " . $_[0]->getName);
269    $_[0]->{LOGGER} = $_[1] ;
270    $_[0]->AskThread( _INIT , "<logger>$_[1]</logger>" );
271}
272
273sub getName {
274    return $_[0]->{NAME} ;
275}
276
277sub getKind {
278    return $_[0]->{KIND} ;
279}
280
281sub is {
282    return grep { $_[0]->getKind() eq $_ } @_ ;
283}
284
285sub isForked {
286    return defined($_[0]->{FORKED}) ? 1 : 0 ;
287}
288
289sub isAlive {
290    return 0 unless ( $_[0]->{TID} );
291    my $test = 0 ;
292    $test ++ if (kill 0 , $_[0]->{TID});
293    # TODO Contrôler errno p 371 linux PT
294    &Debug( $_[0]->getName . ( $test ? " seems alive" : " seems dead" ) );
295    return $test ;
296}
297
298sub isToCheck {
299    return 0 if (defined($_[0]->{QUITTING}));
300    # A thread is to check if it has been check more than 15 seconds ago
301    return $_[0]->LastCheck() > 15 ? $_[0]->CanPing() : 0 ;
302}
303
304sub KeepStats {
305    my $self = shift ;
306    return unless ( $self->{HAVE_STATS} and $KEEP_STATS and -d $STATS_FOLDER );
307
308    if (@_) {
309        my @time = localtime ;
310        push @{$self->{STATS}}, [
311            sprintf('%4d%02d%02d', 1900+$time[5], $time[4]+1, $time[3]),
312            sprintf('%02d%02d%02d', $time[2], $time[1], $time[0])
313            , $self->myStats(\@_) ];
314    }
315
316    # Only keep stat after at least 1 second has passed, and there's some
317    # But keep output stats if we are quitting
318    return unless ( @{$self->{STATS}}
319        and ($self->{DO_QUIT} or &tv_interval($self->{DO_STAT}) > 1));
320
321    my %Locked = () ;
322    my $File ;
323    foreach my $Stat ( @{$self->{STATS}} ) {
324
325        # Compute filename
326        my $statfile = $self->getName() . '-' . $Stat->[0] ;
327
328        unless ( $Locked{$statfile} ) {
329
330            open $File, ">>$STATS_FOLDER/$statfile"
331                or return $self->ThreadError("Can't open stats file '" .
332                    $statfile . "': $!");
333
334            # Try exclusive lock
335            my $locked = flock($File, LOCK_EX);
336
337            unless ($locked) {
338                close($File);
339                return $self->ThreadError("Can't lock stats file '" .
340                    $statfile . "': $!");
341            }
342
343            if ( defined($self->{STATHEADERS}) and @{$self->{STATHEADERS}} ) {
344
345                # Force again headers output after a restart, in case they
346                # have changed
347                print $File join(';', @{$self->{STATHEADERS}}), "\n"
348                    or return $self->ThreadError("Can't output in file '" .
349                        $statfile . "': $!");
350
351                @{$self->{STATHEADERS}} = () ;
352            }
353
354            $Locked{$statfile} = $File ;
355        }
356
357        $File = $Locked{$statfile} ;
358        print $File join(';', @{$Stat}), "\n"
359            or return $self->ThreadError("Can't output in statfile '" .
360                $statfile . "': $!");
361    }
362
363    $self->{STATS} = [ ] ;
364
365    # Unlock and close stat files
366    map { flock($Locked{$_}, LOCK_UN) and close($Locked{$_}) } keys(%Locked);
367
368    # Reset stats timer
369    $self->{DO_STAT} = [ &gettimeofday() ];
370}
371
372sub LastCheck {
373    return defined($_[0]->{CHECKED}) ? &tv_interval($_[0]->{CHECKED}) : 100 ;
374}
375
376sub Check {
377    $_[0]->{TOPING} ++ ;
378    # A thread is really busy if it has not say any thing since 60 seconds
379    # (see isToCheck())
380    &Info($_[0]->getName . " thread seems busy since " .
381      sprintf("%.2f s",$_[0]->{TOPING}))
382        unless ( $_[0]->{TOPING} and $_[0]->{TOPING} % 4 );
383    $_[0]->{CHECKED} = [ &gettimeofday() ];
384    $_[0]->{CanPING} = [ &gettimeofday() ];
385}
386
387sub Ping {
388    $_[0]->AskThread( PING ) if ( $_[0]->{TOPING} and $_[0]->CanPing() );
389}
390
391sub GotPing {
392    $_[0]->{CHECKED} = [ &gettimeofday() ];
393    $_[0]->{TOPING} -- ;
394    $_[0]->{CanPING} = [ &gettimeofday() ];
395}
396
397sub CanPing {
398    # We can repeat a ping only if last ping was done more than 5 seconds ago
399    return 0 unless (defined($_[0]->{CanPING}));
400
401    if (&tv_interval($_[0]->{CanPING}) > 5) {
402        $_[0]->{CanPING} = [ &gettimeofday() ] ;
403        return 1 ;
404    }
405    0 ;
406}
407
408sub SendSigTerm {
409    # Send SIGTERM one time after at least 5 seconds
410    my $self = shift;
411    my $since = $self->StoppingSince();
412    return 0 unless ( $since > 5 and $since < 30 and ! $self->{SIGTERM} );
413    &Debug("Killing " . $self->getName . " with SIGTERM");
414    kill 15, $self->{TID} ;
415    return ++ $self->{SIGTERM} ;
416}
417
418sub SendSigKill {
419    # Send SIGKILL one time after at least 30 seconds
420    my $self = shift;
421    return 0 if ( $self->{SIGKILL} or $self->StoppingSince() < 30 );
422    &Debug("Killing " . $self->getName . " with SIGKILL");
423    kill 9, $self->{TID} ;
424    return ++ $self->{SIGKILL} ;
425}
426
427################################################################################
428##          Fork code                                                         ##
429################################################################################
430sub Fork {
431    my $self = shift;
432
433    # Security: Only one fork call can be done by created Thread object
434    return undef if ($self->isForked());
435
436    # Classical way to fork
437    $self->{TID} = fork ;
438
439    # Return to parent early if not forked
440    return undef unless (defined($self->{TID}));
441
442    $self->{FORKED} = [ &gettimeofday() ];
443
444    # Clean SYSLOG socket
445    if (defined($SYSLOG)) {
446        shutdown($SYSLOG,2);
447        $SYSLOG = undef ;
448        &Debug("SYSLOG socket is reset");
449    }
450
451    # Return to parent
452    if ( $self->{TID} ) {
453        #============ In parent =============
454        &Debug("Parent-to-child link is " . $self->{SON} );
455        push @PIPE , $self->{SON} ;
456
457        # Set the filename for COM locking
458        $SocketProc{$self->{SON}} = "$maintid-" . $self->getKind()
459            if $COM_LOCKING ;
460
461        # Can define logger socket for parent
462        &SetLogger( $self ) if ($self->is('Logger'));
463
464        # Update object name as it is used in logging messages
465        $self->{NAME} .= '[' . $self->{TID} . ']' ;
466
467        # Return to caller in parent
468        return $self->{TID} ;
469    }
470
471    #=== In child, we won't leave this object member ===
472
473    # Clean error status
474    $? = 0 ;
475    $! = 0 ;
476
477    # Clean open COM locking sockets and communications hashes
478    if ( $COM_LOCKING ) {
479        map { close($lockfh{$_}) } keys(%lockfh);
480        %SocketProc = () ;
481        %lockfh     = () ;
482    }
483    %ReadBuffer = () ;
484    %BUFLEN     = () ;
485
486    # Clear stats
487    map { $STATS{$_} = 0 } keys(%STATS) ;
488
489    # Update immedialty signal handling,
490    # Keep in mind these signal handlers won't be called as object member
491    $SIG{'USR1'} = \&mySigUSR1 ;
492    $SIG{'ALRM'} = \&mySigALRM ;
493    $SIG{'TERM'} = \&mySigTERM ;
494    $SIG{'QUIT'} = \&mySigTERM ;
495    $SIG{'HUP'}  = \&mySigHUP  ;
496    $SIG{'ABRT'} = \&mySigTERM ;
497
498    # Update child process name
499    $0        = $self->getName ; # Update process name with 'ps' command
500    $Progname = $self->getName ; # For logging
501
502    # Reduce priority as parent must be the one with the greater priority
503    my $priority = getpriority( 0, $$ );
504    if ( $priority < 15 ) {
505        # Increase our priority
506        setpriority 0, 0, $priority + 5
507            or &Warn("Can't update priority: $!");
508    }
509
510    $self->ThreadInitEarly();
511
512    &Debug("Child " . $self->getName . " is starting...");
513    &Debug("Child-to-parent link is " . $self->{DADDY} );
514    push @PIPE , $self->{DADDY} ;
515
516    # Cleaning this process from previously created objects
517    map {
518        if ( $_ != $$ ) {
519            &Debug("Cleaning unused T$_ cloned object during fork");
520            $sons{$_}->setToClean() ; # Just to avoid checking from DESTROY sub
521            $sons{$_}->Delete();
522        }
523    } keys(%sons);
524    &Debug("Memory cleaned");
525
526    &Debug("Starting " . $self->getName . " T$$");
527
528    $self->ThreadInit();
529
530    my $Ret = $self->ForkedLoop ;
531
532    $loggertid = 0 ; # Force to log directly
533    &LogSigMessage();
534
535    &Debug($self->getName . " ended ($MUSTQUIT;$!;$?;$@)");
536    &Debug("Forced to quit ($MUSTQUIT)") if $MUSTQUIT ;
537
538    $self->Answer( QUIT );
539
540    $self->KeepStats();
541
542    $self->DoBeforeQuit();
543
544    exit $Ret ;
545}
546
547my @bad_answer = () ;
548my @KeepRequest = () ;
549sub ForkedLoop {
550    my $self = $THIS = shift ;
551    my $ErrCount = 0 ;
552    my $request ;
553    my $LoopTiming = [ &gettimeofday() ];
554    my $PendingStatus = [ &gettimeofday() ];
555    my $LoopCount  = 0 ;
556    my $SleepFactor = 1 ;
557    $! = 0 ;
558
559    while ( ! ( $MUSTQUIT or $self->{DO_QUIT} ) ) {
560        &TIMESTAT('ForkedLoop');
561
562        &LogSigMessage();
563
564        # Check to commit any pending job status at last each minute
565        if (&tv_interval($PendingStatus) > 60) {
566            &a2pjobstate ;
567            $PendingStatus = [ &gettimeofday() ];
568        }
569
570        &UPSTAT('LOOPS');
571        $self->{DONTSLEEP} ? &reduceSleep('no stats') : &threadSleep() ;
572
573        # Just to flush buffer
574        $self->PrintSock($self->{DADDY});
575
576        $self->KeepStats();
577
578        # Test thread sub, can be useful while debugging
579        $self->self_TEST() if $do_test ;
580
581        my $max = $COM_BURST ;
582        while ( $max-- and
583            (defined( $request = $self->GetRequest($self->{DADDY})))) {
584
585            last if ( $MUSTQUIT or $self->{DO_QUIT} );
586
587            chomp $request ;
588
589            my @Req = &IsCom( comREQ , $request );
590
591            if ( @Req == 2 ) {
592
593                if ( $Req[0] == QUIT ) { # Do I have to quit
594                    &Debug("Notified to QUIT by parent");
595                    $self->{DO_QUIT} = 1 ;
596
597                } elsif ( $Req[0] == PING ) { # Do I have to ping
598                    $self->Answer( PING );
599                    &Debug("PING");
600
601                } elsif ( $Req[0] == _INIT ) { # ENV update
602                    $self->Answer( _INIT );
603
604                    if ( $Req[1] =~ m|^<init>\s*(\$ENV{\w+}\s*=.*)</init>$|i ) {
605                        if ( $self->{clientrequest} ) {
606                            # SECURITY issue:
607                            # Only Listener is concerned: no external init
608                            # accepted for now
609                            $self->CloseClient( $self->{clientrequest}, 1,
610                                "INIT is not supported from other application");
611
612                        } else {
613                            &Debug("Do INIT: $1");
614                            eval $1 ;
615                        }
616
617                    } elsif ( $Req[1] =~ m|^<init>CAN_UPDATE_NOW</init>$|i ) {
618                        # Update shared vars with ENV definition
619                        &UpdateSharedEnv ;
620                        # Reset Debug comportement
621                        &ResetDebug() ;
622                        $self->InitUpdated();
623
624                    } elsif ( $Req[1] =~ m|^<logger>(\d+)</logger>$|i ) {
625
626                        if ( $1 == $$ ) {
627                            &Debug("Approuved as logger");
628
629                        } elsif ( $self->is('Logger') ) {
630                            &Debug("Revoked as logger thread");
631                            # We should quit as we are no more used for service
632                            $self->{DO_QUIT} = 1 ;
633
634                        } else {
635                            &Debug("My logger is T$1") if $1 ;
636                            $loggertid = $1 ;
637                        }
638
639                    } else {
640                        $self->ThisError("Not able to evaluate '" . $Req[1] .
641                            "' as ENV update");
642                        $ErrCount ++ ;
643                    }
644
645                } elsif ( $Req[0] == TODO ) {
646                    &Debug("Got 'TODO' COM: '$Req[1]'") if $ADVANCED_DEBUGGING ;
647                    $ErrCount ++ unless $self->Do( \$Req[1] );
648
649                    # Called to do our job to we should sleep less next time
650                    &reduceSleep();
651
652                } else {
653                    $self->ThisError("[DEV] Can't decide request " .
654                        (defined($means{$request})?$means{$request}:$request));
655                    $ErrCount ++ ;
656                }
657
658            # TODO Test me with also $Req[0] == _UPDATE
659            } elsif ( @Req = &IsCom( comUPD , $request ) ) {
660                if ( $Req[1] =~ m|^<update>(\w+)=(["'])?(.*)\2</update>$|i ) {
661                    my ( $var , $value ) = ( $1 , $3 );
662                    if (defined($1) and defined($3)) {
663                        if ( grep { /$var/ } @SHARED ) {
664                            my $apos = $value =~ /^\d+$/ ? '' : '"' ;
665                            eval '$' . $var . '=' . $apos . $value . $apos ;
666                            # For important thread initialization
667                            $self->InitUpdated();
668                            &UPSTAT($var.'_DIRECT_UPDATE');
669
670                        } else {
671                            $self->ThisError("Not authorized to update '" . $var
672                                . "' to '$value'");
673                            &UPSTAT('UNAUTH_DIRECT_UPDATE');
674                            $ErrCount ++ ;
675                        }
676
677                    } else {
678                        $self->ThisError("Bad direct update request '$Req[1]'");
679                        &UPSTAT('BAD_DIRECT_UPDATE');
680                        $ErrCount ++ ;
681                    }
682
683                } else {
684                    $self->ThisError("Bad formated update request '$Req[1]'");
685                    &UPSTAT('BADFORMAT_UPDATE');
686                    $ErrCount ++ ;
687                }
688
689            } elsif (  @Req = &IsCom( comCOM , $request ) ) {
690                if ( $Req[1] == comDONE ) {
691                    &UPSTAT('GETCOMDONE');
692
693                    &Debug("Received comDONE from parent")
694                        if $ADVANCED_DEBUGGING ;
695                    $self->ResetRemoteBufSize($self->{DADDY});
696
697                } else {
698                    $self->ThisError("[DEV] Can't handle request '$request'");
699                    $ErrCount ++ ;
700                }
701
702            # Handle XML request
703            } elsif ( @Req = &IsCom( comXML , $request ) ) {
704                &Debug("Got an XML request");
705
706                # Check we have XML API and call it
707                if (UNIVERSAL::can( $self , 'XML' )) {
708                    $self->XML($request);
709
710                } else {
711                    $self->ThisError("[DEV] Not expected to handle '" . $request
712                        . "' XML request");
713                    $ErrCount ++ ;
714                }
715
716            } else {
717                if ( $request =~ /^[<]/ ) {
718                    my $ref ;
719                    &UPSTAT('COM-CHECK-FRAGMENT');
720                    # Control this com is still not handled
721                    if (($ref) = grep { $_->[0] eq $request } @bad_answer ) {
722                        # Don't keep too much time
723                        next if (++ $ref->[1] < 100);
724
725                    } else {
726                        # Will try to reassemble a fragmented com
727                        push @bad_answer, [ $request, 0 ] ;
728                        $max ++ ;
729                        next ;
730                    }
731
732                } elsif (@bad_answer) {
733                    # Try to assemble and reinject immediatly
734                    my $try = shift @bad_answer ;
735                    my $com = $try->[0] . $request ;
736                    &Info("Trying reassembled com: $com");
737                    push @KeepRequest, $com ;
738                    &UPSTAT('COM-REASSEMBLED');
739                    $max ++ ;
740                    next ;
741                }
742
743                # This only critical if not in Logger
744                if ($self->getKind() eq 'Logger') {
745                    &Warn("Critical error on " . $self->getName);
746                } else {
747                    &Alert("Critical error on " . $self->getName);
748                }
749                $self->ThisError("[DEV] Can't interpret request '$request'");
750                $ErrCount ++ ;
751            }
752
753        }# Loop COM_BURST
754
755        # Check parent is alive
756        unless ( kill 0, $maintid ) {
757            # TODO Contrôler errno p 371 de linux pt
758            $loggertid = 0 ;
759            &Warn("Quitting as parent seems dead");
760            $MUSTQUIT = 1 ;
761        }
762
763        { # Rating the do loop
764            no integer ;
765            $LoopCount += 100 ;
766            my $int = &tv_interval($LoopTiming);
767            if ( $int >= 1 ) {
768                $LoopCount = 0 ;
769                $LoopTiming = [ &gettimeofday() ];
770                &MAXSTAT('DOLOOP-RATE',int($LoopCount/$int)/100);
771            }
772        }
773
774        if ( $ADVANCED_DEBUGGING ) {
775            &Debug("Loop rate is " . $STATS{'DOLOOP-RATE'} );
776            $self->loopdebug();
777        }
778
779        &TIMESTAT('ForkedLoop');
780    }# Loop to read queue
781
782    undef $THIS ;
783
784    &Debug("$ErrCount counted errors") if $ErrCount ;
785    return $ErrCount ? 1 : 0 ;
786}
787################################################################################
788##          End Fork code                                                     ##
789################################################################################
790
791sub ThreadInit {
792    1 ; # Nothing to do by default
793}
794
795sub ThreadInitEarly {
796    # Directly link Log answers to parent (defined in Syslog.pm)
797    &SetLogger( $_[0] );
798
799    # This member must be overrided in Logger Object
800}
801
802sub InitUpdated {
803    1 ; # Nothing to do by default
804}
805
806sub GetRequest {
807    my $self     = shift ;
808    my $Requests = shift ;
809
810    # Check if we got a fragmented not safe request
811    return shift @KeepRequest if @KeepRequest ;
812
813    return ( $MUSTQUIT or $self->{DO_QUIT} )? 1 : $self->ReadSock($Requests) ;
814}
815
816sub Do {
817    my $self = shift ;
818    my $ref  = shift ;
819    &Debug("Got to do '$$ref'");
820}
821
822sub DoBeforeQuit {
823    1 ; # Nothing to do by default
824}
825
826sub mySigUSR1 {
827    push @SigDebug, "SIGUSR1 received in " . __PACKAGE__ ;
828    $do_test ++ ;
829}
830
831sub mySigALRM {
832    return unless ($SCAN_SPOOL);
833    $do_file ++ if ( $do_file < $MAXTASK and $SCAN_SPOOL );
834    push @SigDebug, "SIGALRM received (do_file=$do_file) in " . __PACKAGE__ ;
835}
836
837sub mySigTERM {
838    my $ref = $_[0] eq 'ABRT' ? \@SigWarn : \@SigDebug ;
839    push @{$ref}, "SIG$_[0] received in " . __PACKAGE__ ;
840    $MUSTQUIT ++ ;
841    $NO_SYSLOG_DEBUG = 0 if $SERVICE_DEBUG ;
842}
843
844sub mySigHUP {
845    push @SigDebug, "SIGHUP received in " . __PACKAGE__ ;
846}
847
848sub mySigPIPE {
849    my ($package, $filename, $line) = caller();
850    push @SigDebug , "SIG$_[0] received at L. $line in $package: $! $?" ;
851    $MUSTQUIT ++ unless ( grep { defined($_) } @PIPE );
852}
853
854sub myStats {
855    my $self = shift ;
856    return @{$_[0]} ;
857}
858
859sub AskThread {
860    my $self   = shift ;
861    my $Asking = $self->{SON} ;
862
863    if (@_ > 10) {
864        return &Error("[DEV] Too large argument for AskThread: @_");
865    }
866
867    my $msg = &GetCom( comREQ , @_ );
868    {
869        my $len = length($msg);
870        &MAXSTAT('REQUEST-LEN', $len);
871        if ( $len < 50 ) {       &UPSTAT('REQUEST-LEN-000-049');
872        } elsif ( $len < 100 ) { &UPSTAT('REQUEST-LEN-050-099');
873        } elsif ( $len < 200 ) { &UPSTAT('REQUEST-LEN-100-199');
874        } elsif ( $len < 500 ) { &UPSTAT('REQUEST-LEN-200-499');
875        } elsif ( $len < 1000) { &UPSTAT('REQUEST-LEN-500-999');
876        } else {
877            &UPSTAT('REQUEST-LEN-1000+');
878            if ( $len > 9999 ) {
879                open LOG, ">" . &GetTmpFile("$Progname-lastbigreq.log")
880                    or return &Error("Can't open big req log file in TMP: $!");
881                print LOG $msg, "\n" ;
882                close(LOG);
883            }
884        }
885    }
886
887    &UPSTAT("SENDLINES-".$self->{KIND});
888
889    return $self->PrintSock( $Asking, $msg ) ;
890}
891
892sub AnswerDone {
893    $_[0]->Answer( &GetCom( comJOB , $_[1] , &GetCom( comDONE )));
894}
895
896sub AnswerComDone {
897    my $self   = shift ;
898    my $socket = $self->{SON} ;
899    $socket = $self->{DADDY} unless ( $$ == $maintid );
900    $self->{URGENTCOM} = 1 ;
901    if (defined($socket)) {
902        $! = 0 ;
903        $self->ThisError("Can't answer comDONE".(($!)?": $!":""))
904            unless ($self->PrintSock($socket,&GetCom( comCOM , $$ , comDONE )));
905
906    } else {
907        $self->ThisError("Can't answer comDONE: socket not available");
908    }
909    $self->{URGENTCOM} = 0 ;
910}
911
912sub Return {
913    my $self = shift ;
914    $self->Answer( &GetCom( comREQ , JobManager => &GetCom( comJOB , @_ )));
915}
916
917sub Answer {
918    my $self    = shift ;
919    my $Answers = $self->{DADDY} ;
920    return &UPSTAT('Answer-ERROR') unless ( defined($Answers) and @_ );
921
922    my $msg = &GetCom( comCOM, $$ => ( @_ > 1 ? &GetCom( comJOB, @_ ): $_[0] ));
923
924    {
925        my $len = length($msg);
926        &MAXSTAT('Answer-MAXLEN',$len);
927        if ( $len < 50 ) {       &UPSTAT('Answer-LEN-000-049');
928        } elsif ( $len < 100 ) { &UPSTAT('Answer-LEN-050-099');
929        } elsif ( $len < 200 ) { &UPSTAT('Answer-LEN-100-199');
930        } elsif ( $len < 500 ) { &UPSTAT('Answer-LEN-200-499');
931        } elsif ( $len < 1000) { &UPSTAT('Answer-LEN-500-999');
932        } else {                 &UPSTAT('Answer-LEN-1000+'); }
933        if ( $len > $MAX_BUFFER_SIZE - 4096 ) {
934            &UPSTAT("BADCOM-LEN-$len");
935            open LOG, ">>" . &GetTmpFile("$Progname-$maintid-badcom.log")
936                or return &Error("Can't open badcom log file in TMP: $!");
937            print LOG $msg, "\n" ;
938            close(LOG);
939            return &Error("Won't send too long ($len) com");
940        }
941    }
942
943    unless ($self->PrintSock($Answers, $msg )) {
944        &Debug("Answer not sent, ".__PACKAGE__.", l. ".__LINE__." ($!-$?-$@)");
945        return 0 ;
946    }
947}
948
949sub Request {
950    my $self = shift;
951    return unless @_ ;
952    $self->Answer( &GetCom( comREQ , @_ ) );
953}
954
955sub getAnswers { # Should only be used by parent thread
956    my $self = shift;
957    my $max  = $COM_BURST ;
958    my @mesg = () ;
959
960    while ( $max-- and my $msg = $self->ReadSock($self->{SON}) ) {
961        next unless (defined($msg));
962        chomp $msg ;
963        next unless $msg ; # Skip empty lines
964        {
965            my $len = length($msg);
966            &MAXSTAT('ANSWER-MAXLEN',$len);
967            if ( $len < 50 ) {       &UPSTAT('ANSWER-LEN-000-049');
968            } elsif ( $len < 100 ) { &UPSTAT('ANSWER-LEN-050-099');
969            } elsif ( $len < 200 ) { &UPSTAT('ANSWER-LEN-100-199');
970            } elsif ( $len < 500 ) { &UPSTAT('ANSWER-LEN-200-499');
971            } elsif ( $len < 1000) { &UPSTAT('ANSWER-LEN-500-999');
972            } else {                 &UPSTAT('ANSWER-LEN-1000+'); }
973        }
974        push @mesg, $msg ;
975    }
976
977    # Update read line counter
978    &UPSTAT("READLINES-".$self->{KIND},scalar(@mesg));
979
980    return @mesg ;
981}
982
983sub AskToStop {
984    my $self = shift;
985
986    return 1 if ($self->StoppingSince());
987
988    $self->{QUITTING} = [ &gettimeofday() ];
989
990    while ( $self->AskThread( QUIT ) < 0 ) {
991
992        usleep $USLEEP ;
993
994        unless ($self->StoppingSince() < 0.5) {
995            &Info("Can't ask to quit: $! - $? - $@");
996            return 0;
997        }
998    }
999
1000    1 ;
1001}
1002
1003sub AskInit {
1004    return $_[0]->ThreadError("1, can't ask to init '$_[1]'")
1005        unless ($_[0]->AskThread( _INIT , "<init>$_[1]</init>" ));
1006    $_[0]->{_INIT} ++ ;
1007}
1008
1009sub GotInit {
1010    $_[0]->{_INIT} -- ;
1011    &Debug("Thread " . $_[0]->getName . " ENV updated")
1012        unless ( $_[0]->{_INIT} );
1013}
1014
1015sub StoppingSince {
1016    return defined($_[0]->{QUITTING}) ? &tv_interval( $_[0]->{QUITTING} ) : 0 ;
1017}
1018
1019sub DoLog {
1020    my $self = shift ;
1021    &UPSTAT('DOLOG');
1022    my $good = 0 ;
1023
1024    if ( $$ == $maintid ) {
1025        if ($self->is('Logger')) {
1026            $good = $self->PrintSock($self->{SON}, &GetCom( comREQ, TODO, @_ ));
1027
1028        } else {
1029            &debugdev("Not expected to handle '@_' here");
1030            &UPSTAT('DOLOG-ERROR');
1031        }
1032
1033    } else {
1034        $good = $self->PrintSock($self->{DADDY}, &GetCom( comCOM , $$ ,  @_ ));
1035    }
1036
1037    unless ($good) {
1038        &UPSTAT('BAD-DOLOG');
1039        return 0 ;
1040    }
1041}
1042
1043sub END {
1044    my $self = $THIS;
1045
1046    if (defined($self)) {
1047        &Debug("Thread T$$ was " . ($self->{DO_QUIT}?"quitting ":"") .
1048            "on job $self->{JOB}")
1049                if $self->{JOB} ;
1050
1051        # Return any recent Sig message when thread seems aborting unexpectedly
1052        if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager')
1053            and $self->{JOB} and defined($self->{DADDY})
1054            and (@SigWarn or @SigInfo))
1055        {
1056            # Try to advertize JobManager
1057            map { $self->Return(
1058                $self->{JOB} => &GetCom( comINF, Warning => $_ )) } @SigWarn ;
1059            map { $self->Return(
1060                $self->{JOB} => &GetCom( comINF, Info => $_ )) } @SigInfo ;
1061            map { $self->Return(
1062                $self->{JOB} => &GetCom( comINF, Debug => $_ )) } @SigDebug ;
1063            $self->Return( $self->{JOB} , 255 );
1064            $self->AnswerDone( $self->{JOB} );
1065            $self->{JOB} = "" ;
1066        }
1067
1068        # Update loggertid to log directly
1069        &LogSigMessage();
1070    }
1071}
1072
1073sub PreDESTROY {}
1074
1075sub DESTROY {
1076    my $self = shift;
1077
1078    $self->PreDESTROY();
1079
1080    if (!defined($self->{TOCLEAN})) {
1081        # Return any recent Sig message when thread seems aborting unexpectedly
1082        if ( $$ != $maintid and ! $self->{DO_QUIT} and ! $self->is('JobManager')
1083            and ( $self->{JOB} or !defined($self->{DADDY}) )
1084            and ( @SigWarn or @SigInfo ) )
1085        {
1086            &Alert("T$$ seems aborted abnormally".(defined($self->{DADDY})?"":
1087                " with socket to parent lost"), @SigWarn, @SigInfo, @SigDebug );
1088        }
1089
1090        # Update loggertid to log directly
1091        &LogSigMessage();
1092
1093        # Debug age
1094        &Debug(sprintf("%s Age = %.2f s", $self->{NAME},
1095            &tv_interval( $self->{FORKED} )))
1096                if ($self->isForked());
1097
1098        # Must kill forked thread if alive
1099        if ( $self->isForked() and $self->isAlive() ) {
1100            &Debug("Killing thread object $self->{NAME}");
1101            kill 9, $self->{TID} ;
1102        }
1103
1104        if (defined($self->{ERROR})) {
1105            &Error("Thread type object $self->{NAME} destroyed with error #" .
1106                $self->{ERROR});
1107        } else {
1108            &Debug("Thread object $self->{NAME} destroyed");
1109        }
1110
1111        &Debug("Thread object $self->{NAME} status: $! - $? - $@")
1112            if ( $! or $? or $@ );
1113    }
1114
1115    map {
1116        shutdown( $_, 2 )
1117    } grep {
1118        defined($_) and (! $self->{TID} or $$ == $maintid)
1119    }
1120        ( $self->{SON} , $self->{DADDY} );
1121
1122    &Debug(__PACKAGE__." T".($self->{TID}?$self->{TID}:$$)." memory cleaned");
1123}
1124
1125############################### Dedicated socket communication members
1126sub ReadSock {
1127    &TIMESTAT('ReadSock');
1128    my $self = shift ;
1129    my $Socket = shift ;
1130
1131    my $fh = $lockfh{$Socket};
1132    @{$ReadBuffer{$Socket}} = () unless (defined($ReadBuffer{$Socket}));
1133
1134    my $locked = 0 ;
1135    my $size = 0 ;
1136    my ( $bigeventcheck , $bigevent ) = ( 1 , 0 );
1137    my ( $sysreadbuffer, $buffer ) = ( "" , "" );
1138
1139    $Socket->blocking(0);
1140
1141    $locked = &TakeComLocking($fh) if ($COM_LOCKING and $maintid);
1142
1143    $! = 0 ;
1144
1145    while (defined($size+=sysread($Socket, $sysreadbuffer, $MAX_BUFFER_SIZE))) {
1146
1147        last unless $size ;
1148
1149        $buffer .= $sysreadbuffer ;
1150
1151        # bigread-event: Handle case we have fullfil our read buffer, we must
1152        # complete with next read so loop to next sysread
1153        # Test modulo bigeventcheck to handle the case we fullfil more than once
1154        last unless ( $size % $MAX_BUFFER_SIZE == $bigeventcheck );
1155
1156        &UPSTAT('BIGREAD-EVENT'.&SelfStatId($self,'full'));
1157        $bigeventcheck = ++ $bigevent ;
1158        $sysreadbuffer = "" ;
1159    }
1160
1161    # Don't stats BUSY ressource
1162    &UPSTAT('errno-'.int($!)."-$!") if ( $! and $! != 11 );
1163
1164    if ($buffer) {
1165        my @lines = split(/\n/,$buffer);
1166        my $count = scalar(@lines) ;
1167        my $check = comTEST ;
1168        # Check buffer could contain a comTEST event as test could involve
1169        # performance issue
1170        if ( $buffer =~ /$check/ ) {
1171            my @test ;
1172            # Check really now there's a formated comTEST
1173            @lines = grep {
1174                @test = &IsCom( comCOM , $_ );
1175                $test[1] !~ /^$check$/
1176                } @lines ;
1177            if (@lines != $count ){
1178                &UPSTAT('GET-COMTEST'.&SelfStatId($self));
1179                &Debug("Received comTEST for " . $self->getName)
1180                    if $ADVANCED_DEBUGGING ;
1181                $self->AnswerComDone ;
1182            }
1183        }
1184
1185        # Keep lines in socket dedicated buffer
1186        push @{$ReadBuffer{$Socket}} , @lines ;
1187
1188        # Keep some monitoring scalar
1189        $STATS{'READ-LINES-DUMP'.&SelfStatId($self)} = "@lines" if $bigevent ;
1190        &MAXSTAT('MAX-READ-LINES',scalar(@lines));
1191        &MAXSTAT('MAX-READSIZE',$size);
1192        &MAXSTAT('MAX-READBUFFER-LINES',scalar(@{$ReadBuffer{$Socket}}));
1193    }
1194
1195    &UPSTAT('READSOCK-ERR-'.$!.&SelfStatId($self,'full'))
1196        unless (defined($size));
1197
1198    &ReleaseComLocking($fh) if ( $locked );
1199
1200    &TIMESTAT('ReadSock');
1201    return shift @{$ReadBuffer{$Socket}} ;
1202}
1203
1204sub PrintSock {
1205    my $self = shift ;
1206    &UPSTAT('PRINTSOCK-CALL'.&SelfStatId($self));
1207    my $Socket = shift ;
1208    my $locked = 0 ;
1209
1210    # Return printing immediatly unless we should use com locking
1211    unless ($COM_LOCKING and $maintid) {
1212        # maintid is always set after perl is loaded
1213        my $Ret = &RealPrintSock( $self, $Socket, @_ );
1214        return $Ret ;
1215    }
1216
1217    # Only open lockfile one time by process, and keep it opened
1218    my $fh = $lockfh{$Socket} ;
1219    unless (defined($fh)) {
1220        $SocketProc{$Socket} = "$maintid-" . $self->getKind()
1221            unless (defined($SocketProc{$Socket}));
1222        my $lockfile = &GetTmpFile("$SocketProc{$Socket}.comlock");
1223
1224        if (defined($lockfh{$lockfile})) {
1225            $lockfh{$Socket} = $lockfh{$lockfile} ;
1226
1227        } else {
1228            unless ( -e $lockfile ) {
1229                open $lockfh{$Socket} , ">" . $lockfile ;
1230                close($lockfh{$Socket});
1231            }
1232            open $lockfh{$Socket} , "+<" . $lockfile
1233                or delete $lockfh{$Socket},
1234                return &debugdev("Can't open '$lockfile' com lock file: $!");
1235            $lockfh{$lockfile} = $lockfh{$Socket} ;
1236        }
1237        $fh = $lockfh{$Socket} ;
1238    }
1239
1240    $locked = &TakeComLocking($fh);
1241
1242    my $Ret = &RealPrintSock( $self, $Socket, @_ );
1243
1244    &ReleaseComLocking($fh) if ( $locked );
1245
1246    #&TIMESTAT('PrintSock');
1247    return $Ret ;
1248}
1249
1250sub Bufferize {
1251    my $self = shift ;
1252    my $Socket = shift ;
1253    return [] unless (defined($Socket));
1254    my $ref = shift ;
1255
1256    my $bufname = 'BUFFER-LINES'.&SelfStatId($self) ;
1257    &MAXSTAT($bufname."-LINES",scalar(@{$self->{COMBUFFER}}));
1258
1259    # Bufferize
1260    if ($self->{URGENTCOM}) {
1261        unshift @{$self->{COMBUFFER}} , @{$ref} ;
1262        return $self->{COMBUFFER} ;
1263
1264    } else {
1265        push @{$self->{COMBUFFER}} , @{$ref} ;
1266    }
1267
1268    # Test if remote buffer may be full to return empty array ref
1269    return &RemoteIsFull($self,$Socket) ? [] : $self->{COMBUFFER} ;
1270}
1271
1272sub RemoteIsFull {
1273    my $self = shift ;
1274    return 0 if ($self->{URGENTCOM});
1275    my $Socket = shift ;
1276    return 0 unless (defined($Socket));
1277    my $ref = shift ;
1278    local $" = '' ;
1279    my $len = defined($ref) ? length("@{$ref}") : 0 ;
1280    $len += length(@{$self->{COMBUFFER}}[0]) + 1
1281        if ( defined($self->{COMBUFFER}) and @{$self->{COMBUFFER}} );
1282    $len += $BUFLEN{$Socket}  + 4096 ;
1283    if ( $len > $MAX_BUFFER_SIZE ) {
1284        &UPSTAT('REMOTE-FULL-BUFFER-CHECK'.&SelfStatId($self,'full'));
1285        return $len ;
1286    }
1287    return 0 ;
1288}
1289
1290sub RemoteBufSizeUpdate {
1291    my $self = shift ;
1292    my $Socket = shift ;
1293    $BUFLEN{$Socket} += shift ;
1294    $STATS{'CUR-BUFFER-CONTROL'.&SelfStatId($self,'full')} = $BUFLEN{$Socket} ;
1295}
1296
1297my %ComTest = () ;
1298sub ResetRemoteBufSize {
1299    my $self = shift ;
1300    my $Socket = shift ;
1301    $Socket = $self->{SON} unless (defined($Socket));
1302    $BUFLEN{$Socket} = 0 ;
1303    delete $ComTest{$Socket} if (defined($ComTest{$Socket}));
1304}
1305
1306sub SelfStatId {
1307    my $self = shift ;
1308    return "" unless ( defined($self) and $$ == $maintid );
1309    return '-' . ( @_ ? $self->getName : $self->getKind );
1310}
1311
1312sub RetryLater {
1313    my $self   = shift ;
1314    my $buffer = shift ;
1315    my $print  = shift ;
1316    &UPSTAT('SOCK-RETRYLATER-EVENT'.&SelfStatId($self));
1317
1318    unshift @{$buffer}, @{$print} ;
1319
1320    return scalar(@{$print});
1321}
1322
1323sub RealPrintSock {
1324    &TIMESTAT('RealPrintSock');
1325    my $self = shift ;
1326    my $Socket = shift ;
1327
1328    # Below syntax is right but involve a bug
1329    #my $urgent = (defined($self) and exists($self->{URGENTCOM})) ?
1330    #    $self->{URGENTCOM} : 0 ;
1331
1332    # This one is the last from a2p v1.020 without the bug
1333    my $urgent = defined($self) and $self->{URGENTCOM} ;
1334
1335    my $count ;
1336    local $" = "\n" ;
1337    my ( $total , $lines , $test ) = ( 0 , 0 , 0 );
1338    my @print = () ;
1339
1340    $! = 0 ;
1341
1342    my $buffer = &Bufferize( $self, $Socket , \@_ );
1343
1344    while ( @{$buffer} or &RemoteIsFull( $self, $Socket ) ) {
1345
1346        if ( @{$buffer} ) {
1347            $lines ++;
1348            my $line = shift @{$buffer} ;
1349            push @print , $line ;
1350        }
1351
1352        $test = &RemoteIsFull( $self, $Socket, \@print );
1353        #$test = $self->{URGENTCOM} unless ( $test or ! defined($self));
1354
1355        if ( ! @{$buffer} or $test or $urgent ) {
1356            my $tries = 0 ;
1357
1358            # Add comTEST to COM
1359            if ( $test ) {
1360                if (! defined($ComTest{$Socket})
1361                or &tv_interval($ComTest{$Socket}) >= 1) {
1362                    # Send again comTEST after 1 second
1363                    local $" = "' '" ;
1364                    my $comtest = &GetCom( comCOM, $$ , comTEST );
1365                    $ComTest{$Socket} = [ &gettimeofday() ] ;
1366                    &UPSTAT('ADD-COMTEST-EVENT'.&SelfStatId($self));
1367                    if ( $test >= $MAX_BUFFER_SIZE ) {
1368                        &UPSTAT('KEEP-COMTEST-EVENT'.&SelfStatId($self));
1369                        if (@print and ! $urgent) {
1370                            my $print = pop @print ;
1371                            $lines -- ;
1372                            unshift @{$buffer}, $print ;
1373                            &debugdev("Retry later '$print'" . ( @print ?
1374                                " keeping '@print'":"") .
1375                                " to be sure to not fullfill remote buffer" .
1376                                " with '$comtest'")
1377                                if ( $ADVANCED_DEBUGGING );
1378                        }
1379
1380                    } elsif ( $ADVANCED_DEBUGGING ) {
1381                        &debugdev("Sending '$comtest'" .
1382                            (@print?" added to '@print'":""));
1383                    }
1384                    # Add comtest to print list
1385                    push @print, $comtest ;
1386
1387                } elsif (@print) {
1388                    &UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self));
1389                    # Last comTEST was sent recently, we should better directly
1390                    # keep the com and retry it later
1391                    $lines -= &RetryLater($self,$buffer,\@print);
1392                    $total = -1 unless ($total);
1393                    last ;
1394
1395                } else {
1396                    &UPSTAT('SOCK-RECENT-COMTEST-EVENT'.&SelfStatId($self));
1397                    # Remote buffer can be full and we have recently sent a
1398                    # comTEST and there's nothing to send, so just leave
1399                    $total = -1 ;
1400                    last ;
1401                }
1402            }
1403
1404            # Wait when socket is busy
1405            my $print = "@print" . $" ;
1406            while (!defined($count = syswrite($Socket, $print))
1407            and $! == EAGAIN ) {
1408                &UPSTAT('BUSY-SOCKET-EVENT'.&SelfStatId($self));
1409                last if ( $tries ++ > 1000 );
1410                usleep $USLEEP ;
1411                last if $MUSTQUIT ;
1412                &Debug("$Socket socket seems busy with errno: $!")
1413                    unless ( $tries % 20 );
1414            }
1415
1416            if ( $tries ) {
1417                local $" = ' ' ;
1418                &MAXSTAT('CHARSOCK-RETRY',$tries);
1419                $STATS{'CHARSOCK-RETRY'.&SelfStatId($self)} = "@print" ;
1420            }
1421
1422            if (defined($count)) {
1423                if ($count) {
1424                    &RemoteBufSizeUpdate( $self, $Socket, $count );
1425                    $total += $count ;
1426                    &MAXSTAT('CHARSOCK-COUNT',$count);
1427                    &UPSTAT('LINES-SOCK'.&SelfStatId($self,'full'),$lines);
1428
1429                } elsif ($lines) {
1430                    &UPSTAT('LINES-SOCK-NOTSENT'.&SelfStatId($self,'full'),
1431                        $lines);
1432                    $lines -= &RetryLater($self,$buffer,\@print);
1433                    $total = -1 unless ($total);
1434                    last ;
1435                }
1436                last if $test ;
1437
1438            } elsif ($! == EPIPE) {
1439                close($Socket);
1440
1441                if ($self->{TID}) {
1442                    $loggertid = 0 if ($self->getKind eq 'Logger');
1443                    &Warn("Communication has been reset with ".$self->getName);
1444                    $do_check ++ ;
1445
1446                } elsif ( $Socket == $self->{DADDY} ) {
1447                    $loggertid = 0 ;
1448                    $MUSTQUIT ++ ;
1449                    &Warn("Must quit as my socket to parent has been reset");
1450                }
1451
1452            } else {
1453                local $" = "' '" ;
1454                &Debug("$Socket socket is busy, will retrying '@print' later");
1455                $STATS{'SOCKET-ERROR-'.int($!).&SelfStatId($self)} =
1456                    "Can't send '@print'";
1457                $lines -= &RetryLater($self,$buffer,\@print);
1458                $total = -1 unless ($total);
1459                last ;
1460            }
1461
1462            @print = () ;
1463        }
1464
1465        last if $MUSTQUIT ;
1466    }
1467
1468    &TIMESTAT('RealPrintSock');
1469
1470    if ($total) {
1471        &MAXSTAT('MAX-TOTALSOCK'.&SelfStatId($self),$total);
1472        &MAXSTAT('MAX-LINESOCK'.&SelfStatId($self),$lines);
1473        return @{$buffer} ? - @{$buffer} : ( $lines ? $lines : $total );
1474    }
1475
1476    return 0 ;
1477}
1478
1479END {
1480    if ( $COM_LOCKING ) {
1481        # Close all locking file handles and clean any locking file
1482        map { close($lockfh{$_}) ; /^$SERVICE_TMP/ and -e $_ and unlink $_ }
1483            keys(%lockfh);
1484    }
1485}
1486
1487&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
1488
14891;
Note: See TracBrowser for help on using the repository browser.