# # Copyright (c) 2004-2007 - Consultas, PKG.fr # # This file is part of A2P. # # A2P is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # A2P is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with A2P; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # # $Id: Listener.pm 3 2007-10-18 16:20:19Z guillaume $ # package A2P::Listener; # Derived class from Thread.pm use base qw(A2P::Thread Exporter); use strict ; use Socket ; use IO::Socket ; use Time::HiRes qw(ualarm gettimeofday tv_interval) ; use A2P::Globals ; use A2P::Syslog ; use A2P::Com qw( IsCom GetCom comSAY comREQ comCOM comDONE comASK comJOB ); use A2P::Tools qw( ShortID ); use A2P::XML ; BEGIN { our $VERSION = sprintf "%s", q$Rev: 450 $ =~ /(\d[0-9.]+)\s+/ ; } our $VERSION ; my %conn = () ; my %XML = () ; my %timeout = () ; my $clientrequest = 0 ; my @AUTHORIZED = ( INADDR_LOOPBACK ) ; my $ANY_OK = 0 ; my $jobcount = 0 ; my @timer = &gettimeofday() ; ################################################################################ ## Listener handling code ## ################################################################################ sub Do { my $self = shift ; my $ref = shift ; my $Ret = 1 ; my ( @Req , @Ask ); # Handle response from service to currently connected client as comSAY if ( @Req = &IsCom( comSAY , $$ref )) { # Cache the following test as re-used when true my $donetest = ( @Req > 1 ) ? &IsCom( comDONE , $Req[1] ) : 0 ; # Anyway count down job done $donetest and --$jobcount ; if (defined($XML{$Req[0]}) and defined($conn{$Req[0]})) { # Handle XML request $self->XMLEvent(@Req); } elsif (defined($conn{$Req[0]})) { # Handle direct external request &Debug("Con $Req[0]: Sending answer: $Req[1]"); $clientrequest = $Req[0] ; $self->Answer( $Req[1] ); # Reset the time out for this connection $timeout{$Req[0]} = [ &gettimeofday() ] ; } else { &Warn("Connection $1 not available, can't send answer: $2"); delete $XML{$Req[0]} if (defined($XML{$Req[0]})); delete $conn{$Req[0]} ; # Still return to not close non available connection return --$Ret ; } # Then close connection if required if ( $donetest and $CLOSE_CONNECTIONS) { &Info("Con $Req[0]: Closing connection with client"); $self->CloseClient( $Req[0] ); } # Handle client connection as comREQ } elsif ( $clientrequest and @Ask = &IsCom( comASK , $$ref )) { if ( $jobcount < $MAXJOBS ) { my $Con = $clientrequest ; # Necessary for &Answer to know message is for parent $clientrequest = 0 ; &Debug("Con $Con: request to $Ask[0] is '$Ask[1]'"); $self->Answer( $Ask[0] , &GetCom( comASK , $Con => $Ask[1] )); $jobcount ++ ; } else { &Warn("Closing $clientrequest as max jobs count has been reached"); $self->Answer( "ERROR: MAX JOBS COUNT REACHED" ); $self->CloseClient($clientrequest); } # Handle status exchange with parent } elsif ( $$ref =~ m|^STARTED$|i ) { &Debug("I can say service is started"); $self->{STARTED} ++ ; $self->Answer($$ref); } elsif ( $$ref =~ m|^STATUS$|i ) { $self->Answer($self->{STARTED}); &Debug("Returned status '$self->{STARTED}'"); } else { $self->ThisError("Got bad request '$$ref' for listener"); $Ret -- ; } return $Ret ; } sub DoBeforeQuit { my $self = shift ; $clientrequest = 0 ; # Close any client connection map { $clientrequest = $_ ; $self->Answer( "Service is stopping" ); $self->CloseClient( $_ ) } keys(%conn) ; close($self->{SERVER}); unlink($LISTENER); } sub ThreadInit { my $self = shift ; $SIG{'IO'} = sub { &UPSTAT('GOT-SIGNAL-IO'); } ; $SIG{'PIPE'} = sub { &UPSTAT('GOT-SIGNAL-PIPE'); } ; # Open UNIX Socket my $sockaddr = pack_sockaddr_un( $LISTENER ); socket( $self->{SERVER} , PF_UNIX,SOCK_STREAM , 0 ) or return $self->Error("Listener can't create a socket: $!"); bless $self->{SERVER} , "IO::Socket" ; unlink($LISTENER); bind ( $self->{SERVER} , $sockaddr ) or return $self->Error("Listener can't bind server socket: $!"); listen( $self->{SERVER} , SOMAXCONN ) or return $self->Error("Listener can't listen server: $!"); # Handle a list of available servers, only UNIX socket server by default $self->{SERVERS} = [ $self->{SERVER} ]; # Open TCP/IP Socket if ($USE_TCPIP_SOCKET) { # Checking port &Warn("Unsupported port '$USE_TCPIP_PORT' for TCP/IP listening," . " resetting port to default " . ( $USE_TCPIP_PORT = 6400 ) ) unless ( $USE_TCPIP_PORT > 0 and $USE_TCPIP_PORT < 65000 ); my $proto = getprotobyname('tcp'); socket( $self->{TCPSERVER}, PF_INET, SOCK_STREAM, $proto ) or return $self->Error("Listener can't create a TCP/IP socket: $!"); bless $self->{TCPSERVER} , "IO::Socket" ; setsockopt( $self->{TCPSERVER}, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) or return $self->Error("Can't set TCP/IP REUSEADDR options: $!"); setsockopt( $self->{TCPSERVER}, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1)) or return $self->Error("Can't set TCP/IP KEEPALIVE options: $!"); if ($USE_ONLY_LOOPBACK) { $sockaddr = sockaddr_in( $USE_TCPIP_PORT , INADDR_LOOPBACK ); } else { $sockaddr = sockaddr_in( $USE_TCPIP_PORT , INADDR_ANY ); } bind( $self->{TCPSERVER}, $sockaddr ) or return $self->Error("Can't bind server on TCP/IP socket: $!"); listen( $self->{TCPSERVER} , SOMAXCONN ) or return $self->Error("Can't listen as TCP/IP server: $!"); # Add TCP/IP server to servers list push @{$self->{SERVERS}}, $self->{TCPSERVER} ; @AUTHORIZED = ( INADDR_LOOPBACK ) ; foreach my $host (split(/\s+/,$AUTHORIZED_HOST)) { if ( $host =~ /^any$/i or $host eq '0.0.0.0' ) { $ANY_OK ++ ; next ; } my $packed = inet_aton($host) ; if (defined($packed)) { push @AUTHORIZED , $host ; &Debug("IP ".inet_ntoa($packed)." is authorized to access a2p"); } else { &Warn("Skipping $host as its ip wasn't resolved"); } } } %conn = () ; %timeout = () ; $clientrequest = 0 ; } sub CloseClient { my $self = shift ; my $Conn = shift ; my $Err = shift || 0 ; my $Mesg = shift || "" ; $self->ThisError($Mesg), $self->Answer($Mesg) if $Err ; my $socket = defined($conn{$Conn}) ? $conn{$Conn} : 0 ; if ($socket) { &Info("Con $Conn: Socket shutdown"); shutdown( $socket , 2 ); } else { &Info("Con $Conn: Socket still shutdown"); } delete $conn{$Conn} ; delete $timeout{$Conn} ; } sub NotConnected { my $self = shift ; my $Conn = shift ; my $socket = defined($conn{$Conn}) ? $conn{$Conn} : 0 ; if ($socket) { $! = 0 ; my $scheck = '' ; vec($scheck,fileno($socket),1) = 1; unless (select( $scheck , undef , undef, 0)) { &UPSTAT('IS-CONNECTED'); &UPSTAT('CON-errno-'.int($!)."-$!") if $! ; return 0 ; } &Debug("Con $Conn: ".int($!)." - $!") if $! ; } &UPSTAT('IS-DISCONNECTED'); &Debug("Con $Conn: Connection lost"); return 1 ; } sub GetRequest { my $self = shift ; my $Requests = shift ; my $Ret = undef ; # Check to touch LISTENER time access if ( @timer and &tv_interval(\@timer) > 10 ) { if ( utime undef, undef, $LISTENER ) { @timer = &gettimeofday() ; } else { &Warn("Can't touch '$LISTENER' socket: $!"); # Won't touch anymore the socket after one error @timer = () ; } } # Check connections time out map { if ( &tv_interval($timeout{$_}) > $CLIENT_TIMEOUT ) { $clientrequest = $_ ; if ($self->Answer("ANSWERED REQUEST TIME OUT")) { &Info("Reach time out for connection $_"); $self->CloseClient($_,$CLIENT_TIMEOUT, "ANSWERED REQUEST TIME OUT for $_ client"); $! = 0 ; } else { &Debug("Deleting still closed $_ connection"); $self->CloseClient($_); } } elsif ($self->NotConnected($_)) { &Debug("Deleting lost $_ connection"); $self->CloseClient($_); } } keys(%timeout); # Reset client request id flag $clientrequest = 0 ; my ( $Client , $addr ); if ( ! defined($Ret = $self->ReadSock($Requests)) and keys(%conn) < $MAXCONNECTIONS ) { foreach my $Server ( @{$self->{SERVERS}} ) { # Accept should not be blocking $Server->blocking(0); if ( $addr = accept($Client,$Server) ) { bless $Client , "IO::Socket" ; $Client->autoflush(1); # Check client as the right to access us # in the case of TCP/IP server if ( $USE_TCPIP_SOCKET and $Server == $self->{TCPSERVER} ) { my($port,$ip) = sockaddr_in($addr); # If use_only_loopback is set, it is safe to accept # Otherwise check other authorized ips unless ( $USE_ONLY_LOOPBACK or $ANY_OK or grep { /^$ip$/ } @AUTHORIZED ) { &Warn("Blocking $ip without access authorization"); shutdown( $Client , 2 ); next ; } } my $ConnId ; # ConnId is unique in one listener thread do { $ConnId = "CONN" . uc(&ShortID(5)) } until ( ! defined($conn{$ConnId}) ); &Info("Con $ConnId: Got connection with a client"); &Debug("Got connection $ConnId on " . $Client); $Ret = $self->ReadSock($Client) ; $conn{$ConnId} = $Client ; $timeout{$ConnId} = [ &gettimeofday() ] ; $clientrequest = $ConnId if (defined($Ret)); last ; } } } # Now eventually get request from opened connections if (!defined($Ret)) { foreach my $ConnId (keys(%conn)) { $Client = $conn{$ConnId} ; if (defined($Ret = $self->ReadSock($Client))) { $clientrequest = $ConnId ; last ; } } } return $MUSTQUIT ? 1 : $Ret ; } sub XML { my $self = shift ; my $buffer = shift || "" ; # Handle external XML request if ( $clientrequest and $jobcount < $MAXJOBS) { &Debug("Got an external XML request"); # We have to concatenate any other lines until got a valid xml request # 1. First create XML object my $XML = $XML{$clientrequest} = new A2P::XML(\$buffer); # 2. Concatenate on the buffer to get the all XML # Remove any tag corresponding to XML version $buffer =~ s|<\?xml\s.*\?>|| ; my $Client = $conn{$clientrequest} ; $buffer = $self->ReadSock($Client) unless ( $buffer =~ m|^<\w+| ); # Found XML root node to know how XML should be closed my ( $root ) = $buffer =~ m|^<(\w+)\s*[^>]*/*>| ; unless (defined($root) and $root) { $self->ThisError("Can't find XML root in '$buffer' XML request"); $buffer = "" ; $XML->isA2P ; $XML->A2Pmessage("A2P-ABTERM = NO XML ROOT FOUND"); $self->XMLanswer($clientrequest); $self->CloseClient($clientrequest); return 0 ; } &Debug("'$root' XML root node found"); # Only concatenate if we don't get empty root node unless ( $buffer =~ m|^<($root)[^>]*/>| ) { my $line = "" ; # Get next content on socket until agregation seems finalized as xml while (defined($Client) and $Client and defined($line) and $line !~ m|| ) { # Loop on socket $line = $self->ReadSock($Client) ; &Debug("Concatenate XML with '$line'"); $XML->concatenate($line . "\n"); } } &Debug("Full request is '$buffer'") unless (length($buffer)>100); unless ( $XML->isA2P ) { my $abterm = "XML request is not for me" ; $self->ThisError($abterm); $XML->A2Pmessage("A2P-ABTERM = $abterm"); $XML->A2Pabterm ; $self->XMLanswer($clientrequest); $self->CloseClient($clientrequest); return 0 ; } if ( $XML->getPATH ) { my $file = $XML->getPATH ; if ( -e $file ) { # Ask to do the job my $request = &GetCom( comASK , JobManager => &GetCom( comJOB , $XML->getid , $file )); $self->Do( \$request ); # This also update jobcount } else { my $abterm = "Can't convert not existing '$file'" ; $self->ThisError($abterm); $XML->A2Pmessage("A2P-ABTERM = $abterm"); $XML->A2Pabterm ; $self->XMLanswer($clientrequest); $self->CloseClient($clientrequest); return 0 ; } } else { my $abterm = "Mal-formed XML request not supported" ; $self->ThisError($abterm); $XML->A2Pmessage("A2P-ABTERM = $abterm"); $XML->A2Pabterm ; $self->XMLanswer($clientrequest); $self->CloseClient($clientrequest); return 0 ; } } elsif ($clientrequest) { &Warn("Closing $clientrequest as max jobs count has been reached"); &Debug("Closing $clientrequest as $jobcount jobcount >= to $MAXJOBS"); $buffer = "" ; my $XML = $XML{$clientrequest} = new A2P::XML(\$buffer); $XML->isA2P ; $XML->A2Pmessage("A2P-ABTERM = MAX JOBS COUNT REACHED ON A2P SERVICE"); $self->XMLanswer($clientrequest); $self->CloseClient($clientrequest); } else { $self->ThisError("[DEV] Unexpected such '$buffer' XML request"); } } my $qr_version_event = qr/Step 2, Processing AFP JOB '([-a-zA-Z0-9._]+)' with a2p v([0-9.]+)/ ; sub XMLEvent { my $self = shift ; my $conn = shift ; my $event = shift ; unless (defined($XML{$conn})) { &Warn("Can't handle '$event' XML event as '$conn' XML is destroyed"); return 0 ; } my $XML = $XML{$conn} ; # Get version and jobname if available # Pattern: Step 2, Starting AFP JOB '$jobname' with a2p v$version return $XML->A2Pmessage( "A2P-Version = $2", "A2P-Jobname = $1" ) if ( $event =~ $qr_version_event ); # Log any ABTERM event if ( $event =~ /abterm:\s*(.*)$/i ) { $XML->A2Pmessage("A2P-ABTERM = $1"); return $XML->A2Pabterm ; } # Get zipfile return $XML->A2Pmessage("A2P-Zipfile = $1") if ( $event =~ /ZIPFILE=(.*)$/ ); # Get PDF for each task # Pattern: Generating PDF '$PDFNAME' return $XML->A2Ppdf( $1 ) if ( $event =~ /Generating PDF '([-a-zA-Z0-9._]+\.pdf)'/ ); # Finally check state and return answer to client if ( $event =~ /JOBSTATUS:\s+(\w+)/ ) { my $state = $1 ; $XML->A2Pmessage( "A2P-State = $state" ); $XML->setstate( $state ); $self->XMLanswer($conn); } else { &Debug("Forgetting info '$event'"); } } sub XMLanswer { # Return answer and remove XML object my $self = shift ; my $conn = shift ; my $Socket = $conn{$conn} ; my $XML = $XML{$conn} ; if (defined($Socket) and defined($XML)) { $Socket->autoflush(1); print $Socket $XML->A2Panswer(), "\n"; } elsif (!defined($Socket)) { &Warn("Can't send XML answer on '$conn' connection as it seems lost"); } else { &Warn("Can't send XML answer on no XML was defined"); } delete $XML{$conn} ; } sub Answer { my $self = shift ; my $Socket = $clientrequest ? $conn{$clientrequest} : $self->{DADDY} ; return 0 unless (defined($Socket)); my @Answer = &GetCom( comCOM , $$ => ( @_ > 1 ? &GetCom( comREQ , @_ ) : $_[0] )); my $Ret = $clientrequest ? print $Socket @Answer,"\n" : $self->PrintSock( $Socket, @Answer ) or &Debug("Listener answer not sent, ".__PACKAGE__. ", l. ".__LINE__." ($!-$?-$@)"); &Debug("Bad listener answer request: @_") if ( $#_ > 1); return $Ret ; } &Debug("Module " . __PACKAGE__ . " v$VERSION loaded"); 1;