source: A2P/a2p/A2P/Listener.pm @ 3

Last change on this file since 3 was 3, checked in by guillaume, 17 years ago
  • AUTHORS: Ajout des différents contributeurs
  • COPYING: Ajout de la licence GPL v3
  • a2p: Préparation des sources pour leur publication sous GPL
  • Property svn:keywords set to Id
File size: 17.4 KB
Line 
1#
2# Copyright (c) 2004-2007 - Consultas, PKG.fr
3#
4# This file is part of A2P.
5#
6# A2P is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# A2P is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with A2P; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19#
20# $Id: Listener.pm 3 2007-10-18 16:20:19Z guillaume $
21#
22
23package A2P::Listener;
24
25# Derived class from Thread.pm
26use base qw(A2P::Thread Exporter);
27
28use strict ;
29use Socket ;
30use IO::Socket ;
31use Time::HiRes qw(ualarm gettimeofday tv_interval) ;
32use A2P::Globals ;
33use A2P::Syslog ;
34use A2P::Com   qw( IsCom GetCom comSAY comREQ comCOM comDONE comASK comJOB );
35use A2P::Tools qw( ShortID );
36use A2P::XML ;
37
38BEGIN {
39    our $VERSION = sprintf "%s", q$Rev: 450 $ =~ /(\d[0-9.]+)\s+/ ;
40}
41our $VERSION ;
42
43my %conn    = () ;
44my %XML     = () ;
45my %timeout = () ;
46my $clientrequest = 0 ;
47my @AUTHORIZED = ( INADDR_LOOPBACK ) ;
48my $ANY_OK  = 0 ;
49my $jobcount = 0 ;
50
51my @timer = &gettimeofday() ;
52
53################################################################################
54##             Listener handling code                                         ##
55################################################################################
56sub Do {
57    my $self = shift ;
58    my $ref  = shift ;
59
60    my $Ret = 1 ;
61
62    my ( @Req , @Ask );
63
64    # Handle response from service to currently connected client as comSAY
65    if ( @Req = &IsCom( comSAY , $$ref )) {
66
67        # Cache the following test as re-used when true
68        my $donetest = ( @Req > 1 ) ? &IsCom( comDONE , $Req[1] ) : 0 ;
69
70        # Anyway count down job done
71        $donetest and --$jobcount ;
72
73        if (defined($XML{$Req[0]}) and defined($conn{$Req[0]})) {
74            # Handle XML request
75            $self->XMLEvent(@Req);
76
77        } elsif (defined($conn{$Req[0]})) {
78            # Handle direct external request
79            &Debug("Con $Req[0]: Sending answer: $Req[1]");
80            $clientrequest = $Req[0] ;
81            $self->Answer( $Req[1] );
82
83            # Reset the time out for this connection
84            $timeout{$Req[0]} = [ &gettimeofday() ] ;
85
86        } else {
87            &Warn("Connection $1 not available, can't send answer: $2");
88            delete $XML{$Req[0]} if (defined($XML{$Req[0]}));
89            delete $conn{$Req[0]} ;
90            # Still return to not close non available connection
91            return --$Ret ;
92        }
93
94        # Then close connection if required
95        if ( $donetest and $CLOSE_CONNECTIONS) {
96            &Info("Con $Req[0]: Closing connection with client");
97            $self->CloseClient( $Req[0] );
98        }
99
100    # Handle client connection as comREQ
101    } elsif ( $clientrequest and @Ask = &IsCom( comASK , $$ref )) {
102
103        if ( $jobcount < $MAXJOBS ) {
104            my $Con = $clientrequest ;
105
106            # Necessary for &Answer to know message is for parent
107            $clientrequest = 0 ;
108
109            &Debug("Con $Con: request to $Ask[0] is '$Ask[1]'");
110            $self->Answer( $Ask[0] , &GetCom( comASK , $Con => $Ask[1] ));
111
112            $jobcount ++ ;
113
114        } else {
115            &Warn("Closing $clientrequest as max jobs count has been reached");
116            $self->Answer( "ERROR: MAX JOBS COUNT REACHED" );
117            $self->CloseClient($clientrequest);
118        }
119
120    # Handle status exchange with parent
121    } elsif ( $$ref =~ m|^STARTED$|i ) {
122        &Debug("I can say service is started");
123        $self->{STARTED} ++ ;
124        $self->Answer($$ref);
125
126    } elsif ( $$ref =~ m|^STATUS$|i ) {
127        $self->Answer($self->{STARTED});
128        &Debug("Returned status '$self->{STARTED}'");
129
130    } else {
131        $self->ThisError("Got bad request '$$ref' for listener");
132        $Ret -- ;
133    }
134
135    return $Ret ;
136}
137
138sub DoBeforeQuit {
139    my $self = shift ;
140
141    $clientrequest = 0 ;
142
143    # Close any client connection
144    map {
145        $clientrequest = $_ ;
146        $self->Answer( "Service is stopping" );
147        $self->CloseClient( $_ )
148    } keys(%conn) ;
149
150    close($self->{SERVER});
151
152    unlink($LISTENER);
153
154}
155
156sub ThreadInit {
157    my $self    = shift ;
158
159    $SIG{'IO'}   = sub { &UPSTAT('GOT-SIGNAL-IO'); } ;
160    $SIG{'PIPE'} = sub { &UPSTAT('GOT-SIGNAL-PIPE'); } ;
161
162    # Open UNIX Socket
163    my $sockaddr = pack_sockaddr_un( $LISTENER );
164
165    socket( $self->{SERVER} , PF_UNIX,SOCK_STREAM , 0 )
166        or return $self->Error("Listener can't create a socket: $!");
167
168    bless $self->{SERVER} , "IO::Socket" ;
169
170    unlink($LISTENER);
171
172    bind ( $self->{SERVER} , $sockaddr )
173        or return $self->Error("Listener can't bind server socket: $!");
174
175    listen( $self->{SERVER} , SOMAXCONN )
176        or return $self->Error("Listener can't listen server: $!");
177
178    # Handle a list of available servers, only UNIX socket server by default
179    $self->{SERVERS} = [ $self->{SERVER} ];
180
181    # Open TCP/IP Socket
182    if ($USE_TCPIP_SOCKET) {
183        # Checking port
184        &Warn("Unsupported port '$USE_TCPIP_PORT' for TCP/IP listening," .
185            " resetting port to default " . ( $USE_TCPIP_PORT = 6400 ) )
186            unless ( $USE_TCPIP_PORT > 0 and $USE_TCPIP_PORT < 65000 );
187
188        my $proto = getprotobyname('tcp');
189        socket( $self->{TCPSERVER}, PF_INET, SOCK_STREAM, $proto )
190            or return $self->Error("Listener can't create a TCP/IP socket: $!");
191
192        bless $self->{TCPSERVER} , "IO::Socket" ;
193
194        setsockopt( $self->{TCPSERVER}, SOL_SOCKET, SO_REUSEADDR, pack("l", 1))
195            or return $self->Error("Can't set TCP/IP REUSEADDR options: $!");
196
197        setsockopt( $self->{TCPSERVER}, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1))
198            or return $self->Error("Can't set TCP/IP KEEPALIVE options: $!");
199
200        if ($USE_ONLY_LOOPBACK) {
201            $sockaddr = sockaddr_in( $USE_TCPIP_PORT , INADDR_LOOPBACK );
202
203        } else {
204            $sockaddr = sockaddr_in( $USE_TCPIP_PORT , INADDR_ANY );
205        }
206        bind( $self->{TCPSERVER}, $sockaddr )
207            or return $self->Error("Can't bind server on TCP/IP socket: $!");
208
209        listen( $self->{TCPSERVER} , SOMAXCONN )
210            or return $self->Error("Can't listen as TCP/IP server: $!");
211
212        # Add TCP/IP server to servers list
213        push @{$self->{SERVERS}}, $self->{TCPSERVER} ;
214
215        @AUTHORIZED = ( INADDR_LOOPBACK ) ;
216        foreach my $host (split(/\s+/,$AUTHORIZED_HOST)) {
217            if ( $host =~ /^any$/i or $host eq '0.0.0.0' ) {
218                $ANY_OK ++ ;
219                next ;
220            }
221            my $packed = inet_aton($host) ;
222            if (defined($packed)) {
223                push @AUTHORIZED , $host ;
224                &Debug("IP ".inet_ntoa($packed)." is authorized to access a2p");
225
226            } else {
227                &Warn("Skipping $host as its ip wasn't resolved");
228            }
229        }
230    }
231
232    %conn    = () ;
233    %timeout = () ;
234    $clientrequest = 0 ;
235}
236
237sub CloseClient {
238    my $self = shift ;
239    my $Conn = shift ;
240    my $Err  = shift || 0 ;
241    my $Mesg = shift || "" ;
242
243    $self->ThisError($Mesg), $self->Answer($Mesg) if $Err ;
244
245    my $socket = defined($conn{$Conn}) ? $conn{$Conn} : 0 ;
246
247    if ($socket) {
248        &Info("Con $Conn: Socket shutdown");
249        shutdown( $socket , 2 );
250
251    } else {
252        &Info("Con $Conn: Socket still shutdown");
253    }
254    delete $conn{$Conn} ;
255    delete $timeout{$Conn} ;
256}
257
258sub NotConnected {
259    my $self = shift ;
260    my $Conn = shift ;
261    my $socket = defined($conn{$Conn}) ? $conn{$Conn} : 0 ;
262    if ($socket) {
263        $! = 0 ;
264        my $scheck = '' ;
265        vec($scheck,fileno($socket),1) = 1;
266        unless (select( $scheck , undef , undef, 0)) {
267            &UPSTAT('IS-CONNECTED');
268            &UPSTAT('CON-errno-'.int($!)."-$!") if $! ;
269            return 0 ;
270        }
271        &Debug("Con $Conn: ".int($!)." - $!") if $! ;
272    }
273
274    &UPSTAT('IS-DISCONNECTED');
275    &Debug("Con $Conn: Connection lost");
276    return 1 ;
277}
278
279sub GetRequest {
280    my $self     = shift ;
281    my $Requests = shift ;
282    my $Ret      = undef ;
283
284    # Check to touch LISTENER time access
285    if ( @timer and &tv_interval(\@timer) > 10 ) {
286        if ( utime undef, undef, $LISTENER ) {
287            @timer = &gettimeofday() ;
288
289        } else {
290            &Warn("Can't touch '$LISTENER' socket: $!");
291            # Won't touch anymore the socket after one error
292            @timer = () ;
293        }
294    }
295
296    # Check connections time out
297    map {
298        if ( &tv_interval($timeout{$_}) > $CLIENT_TIMEOUT ) {
299            $clientrequest = $_ ;
300            if ($self->Answer("ANSWERED REQUEST TIME OUT")) {
301                &Info("Reach time out for connection $_");
302                $self->CloseClient($_,$CLIENT_TIMEOUT,
303                    "ANSWERED REQUEST TIME OUT for $_ client");
304                $! = 0 ;
305
306            } else {
307                &Debug("Deleting still closed $_ connection");
308                $self->CloseClient($_);
309            }
310
311        } elsif ($self->NotConnected($_)) {
312            &Debug("Deleting lost $_ connection");
313            $self->CloseClient($_);
314        }
315    } keys(%timeout);
316
317    # Reset client request id flag
318    $clientrequest = 0 ;
319
320    my ( $Client , $addr );
321    if ( ! defined($Ret = $self->ReadSock($Requests))
322    and keys(%conn) < $MAXCONNECTIONS ) {
323
324        foreach my $Server ( @{$self->{SERVERS}} ) {
325
326            # Accept should not be blocking
327            $Server->blocking(0);
328
329            if ( $addr = accept($Client,$Server) ) {
330
331                bless $Client , "IO::Socket" ;
332                $Client->autoflush(1);
333
334                # Check client as the right to access us
335                # in the case of TCP/IP server
336                if ( $USE_TCPIP_SOCKET and $Server == $self->{TCPSERVER} ) {
337                    my($port,$ip) = sockaddr_in($addr);
338                    # If use_only_loopback is set, it is safe to accept
339                    # Otherwise check other authorized ips
340                    unless ( $USE_ONLY_LOOPBACK or $ANY_OK
341                    or grep { /^$ip$/ } @AUTHORIZED ) {
342                        &Warn("Blocking $ip without access authorization");
343                        shutdown( $Client , 2 );
344                        next ;
345                    }
346                }
347
348                my $ConnId ;
349                # ConnId is unique in one listener thread
350                do {
351                    $ConnId = "CONN" . uc(&ShortID(5))
352                } until ( ! defined($conn{$ConnId}) );
353
354                &Info("Con $ConnId: Got connection with a client");
355                &Debug("Got connection $ConnId on " . $Client);
356
357                $Ret = $self->ReadSock($Client) ;
358                $conn{$ConnId} = $Client ;
359                $timeout{$ConnId} = [ &gettimeofday() ] ;
360                $clientrequest = $ConnId if (defined($Ret));
361
362                last ;
363            }
364        }
365    }
366
367    # Now eventually get request from opened connections
368    if (!defined($Ret)) {
369        foreach my $ConnId (keys(%conn)) {
370            $Client = $conn{$ConnId} ;
371            if (defined($Ret = $self->ReadSock($Client))) {
372                $clientrequest = $ConnId ;
373                last ;
374            }
375        }
376    }
377
378    return $MUSTQUIT ? 1 : $Ret ;
379}
380
381sub XML {
382    my $self   = shift ;
383    my $buffer = shift || "" ;
384
385    # Handle external XML request
386    if ( $clientrequest and $jobcount < $MAXJOBS) {
387        &Debug("Got an external XML request");
388
389        # We have to concatenate any other lines until got a valid xml request
390        # 1. First create XML object
391        my $XML = $XML{$clientrequest} = new A2P::XML(\$buffer);
392
393        # 2. Concatenate on the buffer to get the all XML
394        # Remove any tag corresponding to XML version
395        $buffer =~ s|<\?xml\s.*\?>|| ;
396
397        my $Client = $conn{$clientrequest} ;
398        $buffer = $self->ReadSock($Client) unless ( $buffer =~ m|^<\w+| );
399
400        # Found XML root node to know how XML should be closed
401        my ( $root ) = $buffer =~ m|^<(\w+)\s*[^>]*/*>| ;
402
403        unless (defined($root) and $root) {
404            $self->ThisError("Can't find XML root in '$buffer' XML request");
405            $buffer = "<a2p/>" ;
406            $XML->isA2P ;
407            $XML->A2Pmessage("A2P-ABTERM = NO XML ROOT FOUND");
408            $self->XMLanswer($clientrequest);
409            $self->CloseClient($clientrequest);
410            return 0 ;
411        }
412
413        &Debug("'$root' XML root node found");
414
415        # Only concatenate if we don't get empty root node
416        unless ( $buffer =~ m|^<($root)[^>]*/>| ) {
417            my $line = "" ;
418
419            # Get next content on socket until agregation seems finalized as xml
420            while (defined($Client) and $Client
421            and defined($line) and $line !~ m|</$root>| ) {
422
423                # Loop on socket
424                $line = $self->ReadSock($Client) ;
425
426                &Debug("Concatenate XML with '$line'");
427                $XML->concatenate($line . "\n");
428            }
429        }
430
431        &Debug("Full request is '$buffer'") unless (length($buffer)>100);
432
433        unless ( $XML->isA2P ) {
434            my $abterm = "XML request is not for me" ;
435            $self->ThisError($abterm);
436            $XML->A2Pmessage("A2P-ABTERM = $abterm");
437            $XML->A2Pabterm ;
438            $self->XMLanswer($clientrequest);
439            $self->CloseClient($clientrequest);
440            return 0 ;
441        }
442
443        if ( $XML->getPATH ) {
444
445            my $file = $XML->getPATH ;
446            if ( -e $file ) {
447                # Ask to do the job
448                my $request = &GetCom( comASK ,
449                    JobManager => &GetCom( comJOB , $XML->getid , $file ));
450                $self->Do( \$request ); # This also update jobcount
451
452            } else {
453                my $abterm = "Can't convert not existing '$file'" ;
454                $self->ThisError($abterm);
455                $XML->A2Pmessage("A2P-ABTERM = $abterm");
456                $XML->A2Pabterm ;
457                $self->XMLanswer($clientrequest);
458                $self->CloseClient($clientrequest);
459                return 0 ;
460            }
461
462        } else {
463            my $abterm = "Mal-formed XML request not supported" ;
464            $self->ThisError($abterm);
465            $XML->A2Pmessage("A2P-ABTERM = $abterm");
466            $XML->A2Pabterm ;
467            $self->XMLanswer($clientrequest);
468            $self->CloseClient($clientrequest);
469            return 0 ;
470        }
471
472    } elsif ($clientrequest) {
473        &Warn("Closing $clientrequest as max jobs count has been reached");
474        &Debug("Closing $clientrequest as $jobcount jobcount >= to $MAXJOBS");
475        $buffer = "<a2p/>" ;
476        my $XML = $XML{$clientrequest} = new A2P::XML(\$buffer);
477        $XML->isA2P ;
478        $XML->A2Pmessage("A2P-ABTERM = MAX JOBS COUNT REACHED ON A2P SERVICE");
479        $self->XMLanswer($clientrequest);
480        $self->CloseClient($clientrequest);
481
482    } else {
483        $self->ThisError("[DEV] Unexpected such '$buffer' XML request");
484    }
485}
486
487my $qr_version_event =
488    qr/Step 2, Processing AFP JOB '([-a-zA-Z0-9._]+)' with a2p v([0-9.]+)/ ;
489
490sub XMLEvent {
491    my $self  = shift ;
492    my $conn  = shift ;
493    my $event = shift ;
494
495    unless (defined($XML{$conn})) {
496        &Warn("Can't handle '$event' XML event as '$conn' XML is destroyed");
497        return 0 ;
498    }
499
500    my $XML = $XML{$conn} ;
501
502    # Get version and jobname if available
503    # Pattern: Step 2, Starting AFP JOB '$jobname' with a2p v$version
504    return $XML->A2Pmessage( "A2P-Version = $2", "A2P-Jobname = $1" )
505        if ( $event =~ $qr_version_event );
506
507    # Log any ABTERM event
508    if ( $event =~ /abterm:\s*(.*)$/i ) {
509        $XML->A2Pmessage("A2P-ABTERM = $1");
510        return $XML->A2Pabterm ;
511    }
512
513    # Get zipfile
514    return $XML->A2Pmessage("A2P-Zipfile = $1")
515        if ( $event =~ /ZIPFILE=(.*)$/ );
516
517    # Get PDF for each task
518    # Pattern: Generating PDF '$PDFNAME'
519    return $XML->A2Ppdf( $1 )
520        if ( $event =~ /Generating PDF '([-a-zA-Z0-9._]+\.pdf)'/ );
521
522    # Finally check state and return answer to client
523    if ( $event =~ /JOBSTATUS:\s+(\w+)/ ) {
524        my $state = $1 ;
525        $XML->A2Pmessage( "A2P-State = $state" );
526        $XML->setstate( $state );
527        $self->XMLanswer($conn);
528
529    } else {
530        &Debug("Forgetting info '$event'");
531    }
532}
533
534sub XMLanswer {
535    # Return answer and remove XML object
536    my $self = shift ;
537    my $conn = shift ;
538    my $Socket = $conn{$conn} ;
539    my $XML    = $XML{$conn} ;
540    if (defined($Socket) and defined($XML)) {
541        $Socket->autoflush(1);
542        print $Socket $XML->A2Panswer(), "\n";
543
544    } elsif (!defined($Socket)) {
545        &Warn("Can't send XML answer on '$conn' connection as it seems lost");
546
547    } else {
548        &Warn("Can't send XML answer on no XML was defined");
549    }
550    delete $XML{$conn} ;
551}
552
553sub Answer {
554    my $self = shift ;
555
556    my $Socket = $clientrequest ? $conn{$clientrequest} : $self->{DADDY} ;
557    return 0 unless (defined($Socket));
558
559    my @Answer =
560        &GetCom( comCOM , $$ => ( @_ > 1 ? &GetCom( comREQ , @_ ) : $_[0] ));
561
562    my $Ret = $clientrequest ? print $Socket @Answer,"\n" :
563        $self->PrintSock( $Socket, @Answer )
564        or &Debug("Listener answer not sent, ".__PACKAGE__.
565            ", l. ".__LINE__." ($!-$?-$@)");
566
567    &Debug("Bad listener answer request: @_") if ( $#_ > 1);
568
569    return $Ret ;
570}
571
572&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
573
5741;
Note: See TracBrowser for help on using the repository browser.