source: A2P/a2p/A2P/threads.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 12.5 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: threads.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22
23package A2P::threads;
24
25use strict ;
26use integer ;
27use UNIVERSAL 'isa' ;
28use Time::HiRes qw( usleep gettimeofday tv_interval ) ;
29use A2P::Globals ;
30use A2P::Syslog ;
31use A2P::Signal ;
32use A2P::Com qw( IsCom comSAY );
33
34BEGIN {
35    use Exporter ();
36
37    our ( $VERSION , @ISA , @EXPORT , @EXPORT_OK );
38
39    $VERSION = sprintf "%s", q$Rev: 490 $ =~ /(\d[0-9.]+)\s+/ ;
40
41    @ISA = qw(Exporter);
42    @EXPORT = qw( %sons %sonname %SonKind %HasQuit @answers $started );
43    @EXPORT_OK = qw(
44        &THCountAll &THCountStopping &is_son &FromToDoCom &SetNewLogger
45        &ThreadsStarter &ThreadsChecker &CheckChildrenQuitTimeOut
46        &is_really_logger &threads_internal_updates &THList
47        &threadSleep &reduceSleep
48        );
49}
50our $VERSION ;
51
52#############################################################################
53##                 Declaration and vars update                             ##
54#############################################################################
55
56our %SonKind = ();
57
58# Sons/Threads and Quit status hash list, keys are processes PID
59our %sons ;
60our %sonname ;
61our %HasQuit ;
62my %LastRestart ;
63
64# Children answers list
65our @answers ;
66
67my $currentlogger = 0 ;
68our $started = 0 ;
69my $controlrestart = 0 ;
70my $restartcheck = [ &gettimeofday() ] ;
71
72# Should be called at any conf reload request by the parent
73sub threads_internal_updates {
74    %SonKind = (
75        #######################################################################
76        #                Thread Alias       Thread Factor
77        #######################################################################
78                    'JobManager'    =>  1,
79                    'SpoolManager'  =>  $SCAN_SPOOL ? 1 : 0,
80                    'Listener'      =>  1,
81                    'Logger'        =>  $START_LOGGER,
82                    'Converter'     =>  $MAXJOBS,
83                    'BackEnd'       =>  $MAXJOBS * $BACKEND_FACTOR,
84                    'Archiver'      =>  $ARCH_ENABLED < 100 ?
85                        $BACKEND_FACTOR * $ARCH_ENABLED : $BACKEND_FACTOR,
86                    'EService'      =>  $ESERVICE_ENABLED < 100 ?
87                        $BACKEND_FACTOR * $ESERVICE_ENABLED : $BACKEND_FACTOR
88
89        #######################################################################
90        );
91}
92
93&threads_internal_updates();
94
95#############################################################################
96##                 Exported tools used in any process                      ##
97#############################################################################
98my $SleepFactor = 1 ;
99
100sub threadSleep {
101    # Short sleep (USLEEP/2) when quitting
102    return usleep $USLEEP >> 1 if ($do_quit);
103
104    usleep $SleepFactor * $USLEEP ;
105
106    if ($DYNAMIC_USLEEP) {
107        &UPSTAT('LOOPS-SLEEP');
108        &UPSTAT('LOOPS-SKIPPED',$SleepFactor-1);
109
110        # Update SleepFactor and its monitoring
111        &MAXSTAT('SLEEP-FACTOR',++ $SleepFactor)
112            unless ( $SleepFactor * $USLEEP >= $MAX_DYNAMIC_USLEEP );
113    }
114}
115
116sub reduceSleep {
117    return unless ( $DYNAMIC_USLEEP and $SleepFactor > 1 );
118
119    # Keep stats on SleepFactor when it has grown, unless called with any value
120    &UPSTAT('MAXSLEEPFACTOR-REACHED-'.$SleepFactor) unless @_ ;
121
122    # Decrease rapidly SleepFactor by binary division until reach 1
123    # Greater than 4, we divide by 2^2=4 else by 2^1=2
124    $SleepFactor >>= $SleepFactor > 4 ? 2 : $SleepFactor > 1 ;
125}
126
127#############################################################################
128##                 Exported tools used in parent                           ##
129#############################################################################
130
131# Thread check mini subs
132sub THList {
133    return grep { $sons{$_}->is($_[0]) } keys(%sons) ;
134}
135
136sub THAlive {
137    return grep { ! $HasQuit{$_} } &THList($_[0]) ;
138}
139
140sub THCountAll {
141    return scalar(grep { ! $sons{$_}->StoppingSince() } keys(%sons));
142}
143
144sub THCountStopping {
145    return scalar(grep {   $sons{$_}->StoppingSince() } keys(%sons));
146}
147
148sub is_son {
149    return 0 unless (defined($sons{$_[0]}));
150    ref($sons{$_[0]}) =~ /^A2P::(.*)$/ ;
151    return defined($1) ? defined($SonKind{$1}) : 0 ;
152}
153
154sub is_really_logger {
155    return &is_son($_[0]) and $sons{$_[0]}->is('Logger') ;
156}
157
158sub FromToDoCom {
159    my $reqid   = shift ;
160    my $kind    = shift ;
161    my @threads = $kind =~ /^\d+$/ ? ( $kind ) : &THAlive($kind) ;
162
163    &Error("No such $kind thread for a request")
164        unless @threads ;
165
166    # Find one available thread to do the request
167    while ( @threads ) {
168        my $chance = int(rand(@threads));
169        my $thread = shift @threads ;
170        if ($chance) {
171            # Choose randomly which process to try to not send request to the
172            # same free process
173            push @threads, $thread ;
174            next ;
175        }
176
177        my $requester = $sons{$thread}->isBusyBy() ;
178        if ($requester) {
179            &Debug($sonname{$thread} . " still busy by " . $sonname{$requester},
180                "Business is on '" . $sons{$thread}->getBusyReq() . "'");
181
182        } else {
183            # We don't want to busy JobManager and Listener unless by parent
184            $reqid = 0 if ( $reqid != $$ and
185                ($sons{$thread}->is('JobManager','Listener')));
186
187            # Keep who generate the request to return next answer
188            $sons{$thread}->setBusy( $reqid , @_ )
189                unless ( ! $reqid or &IsCom( comSAY , $_[0] ));
190
191            $sons{$thread}->AskThread( TODO , @_ );
192
193            &Debug("Request transmitted to " . $sonname{$thread});
194            return 1 ;
195        }
196    }
197
198    return 0 ;
199}
200
201sub SetNewLogger {
202    # Argument can be < 0 to initialize logger with current logger thread
203    $loggertid = shift if @_ ;
204    ( $loggertid ) = &THList('Logger') if ( $loggertid < 0 );
205    return $loggertid if ( $loggertid == $currentlogger );
206
207    &SetLogger( $sons{$loggertid} );
208    map { $sons{$_}->SwitchLogger( $loggertid ) } keys(%sons) ;
209    $currentlogger = $loggertid ;
210}
211
212#############################################################################
213##                 Thread starter and checker code                         ##
214#############################################################################
215sub ThreadsStarter {
216    my $Sub  = shift ;
217    my $debug = $_[0] ? shift : 0 ;
218
219    my $MaxTask = $MAXTASK * $SonKind{$Sub} ;
220    my $Object = 'A2P::' . $Sub ;
221
222    &Debug("Launching starter with sub '$Sub' and $MaxTask tasks");
223
224    &Debug("Starting $MaxTask $Object thread") if ( $debug and $MaxTask );
225
226    # Start threads if not enough threads are started
227    if ( &THList($Sub) < $MaxTask ) {
228
229        while ( &THList($Sub) < $MaxTask ) {
230
231            &Debug("Creating new $Object thread object");
232
233            my $Child = new $Object ;
234
235            if ( &isa( $Child ,$Object )) {
236                &Debug("$Child created");
237
238            } else {
239                &Error("Can't create $Object thread object");
240                last ;
241            }
242
243            if ( $Child->init() ) {
244
245                my $Tid = $Child->Fork();
246
247                if (defined($Tid)) {
248                    &Debug("$Sub thread started with TID $Tid");
249
250                    $sons{$Tid} = $Child ;
251                    $sonname{$Tid} = $Child->getName ;
252                    $HasQuit{$Tid} = 0 ;
253
254                } else {
255                    &Error("Can't start $Sub thread");
256                    last ;
257                }
258
259            } else {
260                &Error("Can't initialize $Sub thread");
261                last ;
262            }
263        }
264
265    # Or ask threads to check if one can quit
266    } elsif ( &THList($Sub) > $MaxTask ) {
267        my @Childs = &THList($Sub) ;
268        my $count  = scalar(@Childs) - $MaxTask ;
269        while ( $count -- ) {
270            my $child = shift @Childs ;
271
272            &Debug("Asking T$child $Sub thread to stop");
273            $sons{$child}->AskToStop();
274            &SetNewLogger(0) if ( $sons{$child}->getKind() =~ /logger/i );
275            $sonname{$child} .= '*' ;
276            $HasQuit{$child} = $sonname{$child} ;
277        }
278
279    # Or we are checking a thread kind as it can be dead
280    } elsif ($MaxTask) {
281        foreach ( &THList($Sub) ) {
282            $sons{$_}->Delete()
283                unless $sons{$_}->isAlive() ;
284        }
285        &Debug("Still have the right number ($MaxTask) of $Sub thread")
286            if ( &THList($Sub) == $MaxTask );
287    }
288    my @list = &THAlive($Sub) ;
289    &Debug( scalar(@list) . " $Object thread started") if ( $debug and $MaxTask );
290    return @list ;
291}
292
293sub ThreadsChecker {
294    my $Err = 0 ;
295
296    # Update SpoolManager need if necessary
297    if ( $SonKind{SpoolManager} != $SCAN_SPOOL ) {
298        &Debug(($SCAN_SPOOL ? "Activating"  : "Suspending") . " SpoolManager thread");
299        $SonKind{SpoolManager} = $SCAN_SPOOL ;
300    }
301
302    foreach my $kind ( @_ ? @_ : keys(%SonKind) ) {
303        my $need = $MAXTASK * $SonKind{$kind} ;
304        my $todo = $need - &THAlive($kind) ;
305        if ( $todo ) {
306            my $nextmessage ;
307            if ( $todo < 0 ) {
308                &Debug("Shutting down " . abs($todo) . " $kind threads to reach $need");
309                $nextmessage = "Failed to stop threads" ;
310
311            } else {
312                &Alert("Restarting $todo $kind threads to reach ".($need>1?"$need ones":"one"));
313                $nextmessage = "Failed to start threads" ;
314                $controlrestart ++ ;
315                if (&tv_interval($restartcheck) >= 2 ) {
316                    if ( $controlrestart > 5 ) {
317                        &Alert("Stopping service as too much process restart ($controlrestart>5) occurs in the last 2 sec");
318                        $MUSTQUIT ++ ;
319                    }
320                    $controlrestart = 0 ;
321                    $restartcheck = [ &gettimeofday() ] ;
322                }
323            }
324
325            # Don't try again to start a process too quickly
326            if (defined($LastRestart{$kind})) {
327                unless ( &tv_interval($LastRestart{$kind}) >= 5 ) {
328                    &Debug("Will delay the start of $kind thread");
329                    next ;
330                }
331                &Debug("Previous $kind thread was started " .
332                    &tv_interval($LastRestart{$kind}) . " sec ago");
333            }
334            $LastRestart{$kind} = [ &gettimeofday() ];
335
336            my $count = &ThreadsStarter( $kind );
337            # Try again one time as first call could have only clean the hash
338            $count = &ThreadsStarter( $kind ) unless ( $count == $need );
339            unless ( $count == $need ) {
340                &Warn($nextmessage) ;
341                &Warn("Only " . &THAlive($kind) . " threads are availables");
342                $Err ++ ;
343            }
344        }
345    }
346    return $Err ? 0 : 1 ;
347}
348
349my ( $firstcheck , $lastcheck ) = ( [ &gettimeofday() ] , [ ] );
350sub CheckChildrenQuitTimeOut {
351
352    if ( @{$lastcheck} or &tv_interval($firstcheck) >= 1 ) {
353        if ( @{$lastcheck} and &tv_interval($lastcheck) >= 1 ) {
354            &Debug((&THCountAll + &THCountStopping) .
355                " childrens seems to be alive since " . &tv_interval($firstcheck));
356            $lastcheck = [ &gettimeofday() ];
357
358        } elsif ( ! @{$lastcheck} ) {
359            $firstcheck = [ &gettimeofday() ];
360            $lastcheck = [ &gettimeofday() ];
361        }
362    }
363
364    map {
365        if ( ! $HasQuit{$_} and $sons{$_}->isAlive() ) {
366            if ($sons{$_}->SendSigTerm()) {
367                &Debug("Sending SIGTERM to children T$_");
368
369            } elsif ($sons{$_}->SendSigKill()) {
370                &Debug("Sending SIGKILL to children T$_");
371                $sons{$_}->Delete() ;
372                $sonname{$_} .= '+' ;
373                $HasQuit{$_} = $sonname{$_} ;
374            }
375
376        } else {
377            &Debug("Children T$_ has leaved now");
378            push @answers , $sons{$_}->getAnswers() ;
379            $sons{$_}->Delete() ;
380            $sonname{$_} .= '+' ;
381            $HasQuit{$_} = $sonname{$_} ;
382        }
383    } keys(%sons) ;
384}
385
386&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
387
3881;
Note: See TracBrowser for help on using the repository browser.