#
# Copyright (c) 2004-2007 - Consultas, PKG.fr
#
# This file is part of A2P.
#
# A2P is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# A2P is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with A2P; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# $Id: Listener.pm 3 2007-10-18 16:20:19Z guillaume $
#
package A2P::Listener;
# Derived class from Thread.pm
use base qw(A2P::Thread Exporter);
use strict ;
use Socket ;
use IO::Socket ;
use Time::HiRes qw(ualarm gettimeofday tv_interval) ;
use A2P::Globals ;
use A2P::Syslog ;
use A2P::Com qw( IsCom GetCom comSAY comREQ comCOM comDONE comASK comJOB );
use A2P::Tools qw( ShortID );
use A2P::XML ;
BEGIN {
our $VERSION = sprintf "%s", q$Rev: 450 $ =~ /(\d[0-9.]+)\s+/ ;
}
our $VERSION ;
my %conn = () ;
my %XML = () ;
my %timeout = () ;
my $clientrequest = 0 ;
my @AUTHORIZED = ( INADDR_LOOPBACK ) ;
my $ANY_OK = 0 ;
my $jobcount = 0 ;
my @timer = &gettimeofday() ;
################################################################################
## Listener handling code ##
################################################################################
sub Do {
my $self = shift ;
my $ref = shift ;
my $Ret = 1 ;
my ( @Req , @Ask );
# Handle response from service to currently connected client as comSAY
if ( @Req = &IsCom( comSAY , $$ref )) {
# Cache the following test as re-used when true
my $donetest = ( @Req > 1 ) ? &IsCom( comDONE , $Req[1] ) : 0 ;
# Anyway count down job done
$donetest and --$jobcount ;
if (defined($XML{$Req[0]}) and defined($conn{$Req[0]})) {
# Handle XML request
$self->XMLEvent(@Req);
} elsif (defined($conn{$Req[0]})) {
# Handle direct external request
&Debug("Con $Req[0]: Sending answer: $Req[1]");
$clientrequest = $Req[0] ;
$self->Answer( $Req[1] );
# Reset the time out for this connection
$timeout{$Req[0]} = [ &gettimeofday() ] ;
} else {
&Warn("Connection $1 not available, can't send answer: $2");
delete $XML{$Req[0]} if (defined($XML{$Req[0]}));
delete $conn{$Req[0]} ;
# Still return to not close non available connection
return --$Ret ;
}
# Then close connection if required
if ( $donetest and $CLOSE_CONNECTIONS) {
&Info("Con $Req[0]: Closing connection with client");
$self->CloseClient( $Req[0] );
}
# Handle client connection as comREQ
} elsif ( $clientrequest and @Ask = &IsCom( comASK , $$ref )) {
if ( $jobcount < $MAXJOBS ) {
my $Con = $clientrequest ;
# Necessary for &Answer to know message is for parent
$clientrequest = 0 ;
&Debug("Con $Con: request to $Ask[0] is '$Ask[1]'");
$self->Answer( $Ask[0] , &GetCom( comASK , $Con => $Ask[1] ));
$jobcount ++ ;
} else {
&Warn("Closing $clientrequest as max jobs count has been reached");
$self->Answer( "ERROR: MAX JOBS COUNT REACHED" );
$self->CloseClient($clientrequest);
}
# Handle status exchange with parent
} elsif ( $$ref =~ m|^STARTED$|i ) {
&Debug("I can say service is started");
$self->{STARTED} ++ ;
$self->Answer($$ref);
} elsif ( $$ref =~ m|^STATUS$|i ) {
$self->Answer($self->{STARTED});
&Debug("Returned status '$self->{STARTED}'");
} else {
$self->ThisError("Got bad request '$$ref' for listener");
$Ret -- ;
}
return $Ret ;
}
sub DoBeforeQuit {
my $self = shift ;
$clientrequest = 0 ;
# Close any client connection
map {
$clientrequest = $_ ;
$self->Answer( "Service is stopping" );
$self->CloseClient( $_ )
} keys(%conn) ;
close($self->{SERVER});
unlink($LISTENER);
}
sub ThreadInit {
my $self = shift ;
$SIG{'IO'} = sub { &UPSTAT('GOT-SIGNAL-IO'); } ;
$SIG{'PIPE'} = sub { &UPSTAT('GOT-SIGNAL-PIPE'); } ;
# Open UNIX Socket
my $sockaddr = pack_sockaddr_un( $LISTENER );
socket( $self->{SERVER} , PF_UNIX,SOCK_STREAM , 0 )
or return $self->Error("Listener can't create a socket: $!");
bless $self->{SERVER} , "IO::Socket" ;
unlink($LISTENER);
bind ( $self->{SERVER} , $sockaddr )
or return $self->Error("Listener can't bind server socket: $!");
listen( $self->{SERVER} , SOMAXCONN )
or return $self->Error("Listener can't listen server: $!");
# Handle a list of available servers, only UNIX socket server by default
$self->{SERVERS} = [ $self->{SERVER} ];
# Open TCP/IP Socket
if ($USE_TCPIP_SOCKET) {
# Checking port
&Warn("Unsupported port '$USE_TCPIP_PORT' for TCP/IP listening," .
" resetting port to default " . ( $USE_TCPIP_PORT = 6400 ) )
unless ( $USE_TCPIP_PORT > 0 and $USE_TCPIP_PORT < 65000 );
my $proto = getprotobyname('tcp');
socket( $self->{TCPSERVER}, PF_INET, SOCK_STREAM, $proto )
or return $self->Error("Listener can't create a TCP/IP socket: $!");
bless $self->{TCPSERVER} , "IO::Socket" ;
setsockopt( $self->{TCPSERVER}, SOL_SOCKET, SO_REUSEADDR, pack("l", 1))
or return $self->Error("Can't set TCP/IP REUSEADDR options: $!");
setsockopt( $self->{TCPSERVER}, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1))
or return $self->Error("Can't set TCP/IP KEEPALIVE options: $!");
if ($USE_ONLY_LOOPBACK) {
$sockaddr = sockaddr_in( $USE_TCPIP_PORT , INADDR_LOOPBACK );
} else {
$sockaddr = sockaddr_in( $USE_TCPIP_PORT , INADDR_ANY );
}
bind( $self->{TCPSERVER}, $sockaddr )
or return $self->Error("Can't bind server on TCP/IP socket: $!");
listen( $self->{TCPSERVER} , SOMAXCONN )
or return $self->Error("Can't listen as TCP/IP server: $!");
# Add TCP/IP server to servers list
push @{$self->{SERVERS}}, $self->{TCPSERVER} ;
@AUTHORIZED = ( INADDR_LOOPBACK ) ;
foreach my $host (split(/\s+/,$AUTHORIZED_HOST)) {
if ( $host =~ /^any$/i or $host eq '0.0.0.0' ) {
$ANY_OK ++ ;
next ;
}
my $packed = inet_aton($host) ;
if (defined($packed)) {
push @AUTHORIZED , $host ;
&Debug("IP ".inet_ntoa($packed)." is authorized to access a2p");
} else {
&Warn("Skipping $host as its ip wasn't resolved");
}
}
}
%conn = () ;
%timeout = () ;
$clientrequest = 0 ;
}
sub CloseClient {
my $self = shift ;
my $Conn = shift ;
my $Err = shift || 0 ;
my $Mesg = shift || "" ;
$self->ThisError($Mesg), $self->Answer($Mesg) if $Err ;
my $socket = defined($conn{$Conn}) ? $conn{$Conn} : 0 ;
if ($socket) {
&Info("Con $Conn: Socket shutdown");
shutdown( $socket , 2 );
} else {
&Info("Con $Conn: Socket still shutdown");
}
delete $conn{$Conn} ;
delete $timeout{$Conn} ;
}
sub NotConnected {
my $self = shift ;
my $Conn = shift ;
my $socket = defined($conn{$Conn}) ? $conn{$Conn} : 0 ;
if ($socket) {
$! = 0 ;
my $scheck = '' ;
vec($scheck,fileno($socket),1) = 1;
unless (select( $scheck , undef , undef, 0)) {
&UPSTAT('IS-CONNECTED');
&UPSTAT('CON-errno-'.int($!)."-$!") if $! ;
return 0 ;
}
&Debug("Con $Conn: ".int($!)." - $!") if $! ;
}
&UPSTAT('IS-DISCONNECTED');
&Debug("Con $Conn: Connection lost");
return 1 ;
}
sub GetRequest {
my $self = shift ;
my $Requests = shift ;
my $Ret = undef ;
# Check to touch LISTENER time access
if ( @timer and &tv_interval(\@timer) > 10 ) {
if ( utime undef, undef, $LISTENER ) {
@timer = &gettimeofday() ;
} else {
&Warn("Can't touch '$LISTENER' socket: $!");
# Won't touch anymore the socket after one error
@timer = () ;
}
}
# Check connections time out
map {
if ( &tv_interval($timeout{$_}) > $CLIENT_TIMEOUT ) {
$clientrequest = $_ ;
if ($self->Answer("ANSWERED REQUEST TIME OUT")) {
&Info("Reach time out for connection $_");
$self->CloseClient($_,$CLIENT_TIMEOUT,
"ANSWERED REQUEST TIME OUT for $_ client");
$! = 0 ;
} else {
&Debug("Deleting still closed $_ connection");
$self->CloseClient($_);
}
} elsif ($self->NotConnected($_)) {
&Debug("Deleting lost $_ connection");
$self->CloseClient($_);
}
} keys(%timeout);
# Reset client request id flag
$clientrequest = 0 ;
my ( $Client , $addr );
if ( ! defined($Ret = $self->ReadSock($Requests))
and keys(%conn) < $MAXCONNECTIONS ) {
foreach my $Server ( @{$self->{SERVERS}} ) {
# Accept should not be blocking
$Server->blocking(0);
if ( $addr = accept($Client,$Server) ) {
bless $Client , "IO::Socket" ;
$Client->autoflush(1);
# Check client as the right to access us
# in the case of TCP/IP server
if ( $USE_TCPIP_SOCKET and $Server == $self->{TCPSERVER} ) {
my($port,$ip) = sockaddr_in($addr);
# If use_only_loopback is set, it is safe to accept
# Otherwise check other authorized ips
unless ( $USE_ONLY_LOOPBACK or $ANY_OK
or grep { /^$ip$/ } @AUTHORIZED ) {
&Warn("Blocking $ip without access authorization");
shutdown( $Client , 2 );
next ;
}
}
my $ConnId ;
# ConnId is unique in one listener thread
do {
$ConnId = "CONN" . uc(&ShortID(5))
} until ( ! defined($conn{$ConnId}) );
&Info("Con $ConnId: Got connection with a client");
&Debug("Got connection $ConnId on " . $Client);
$Ret = $self->ReadSock($Client) ;
$conn{$ConnId} = $Client ;
$timeout{$ConnId} = [ &gettimeofday() ] ;
$clientrequest = $ConnId if (defined($Ret));
last ;
}
}
}
# Now eventually get request from opened connections
if (!defined($Ret)) {
foreach my $ConnId (keys(%conn)) {
$Client = $conn{$ConnId} ;
if (defined($Ret = $self->ReadSock($Client))) {
$clientrequest = $ConnId ;
last ;
}
}
}
return $MUSTQUIT ? 1 : $Ret ;
}
sub XML {
my $self = shift ;
my $buffer = shift || "" ;
# Handle external XML request
if ( $clientrequest and $jobcount < $MAXJOBS) {
&Debug("Got an external XML request");
# We have to concatenate any other lines until got a valid xml request
# 1. First create XML object
my $XML = $XML{$clientrequest} = new A2P::XML(\$buffer);
# 2. Concatenate on the buffer to get the all XML
# Remove any tag corresponding to XML version
$buffer =~ s|<\?xml\s.*\?>|| ;
my $Client = $conn{$clientrequest} ;
$buffer = $self->ReadSock($Client) unless ( $buffer =~ m|^<\w+| );
# Found XML root node to know how XML should be closed
my ( $root ) = $buffer =~ m|^<(\w+)\s*[^>]*/*>| ;
unless (defined($root) and $root) {
$self->ThisError("Can't find XML root in '$buffer' XML request");
$buffer = "" ;
$XML->isA2P ;
$XML->A2Pmessage("A2P-ABTERM = NO XML ROOT FOUND");
$self->XMLanswer($clientrequest);
$self->CloseClient($clientrequest);
return 0 ;
}
&Debug("'$root' XML root node found");
# Only concatenate if we don't get empty root node
unless ( $buffer =~ m|^<($root)[^>]*/>| ) {
my $line = "" ;
# Get next content on socket until agregation seems finalized as xml
while (defined($Client) and $Client
and defined($line) and $line !~ m|$root>| ) {
# Loop on socket
$line = $self->ReadSock($Client) ;
&Debug("Concatenate XML with '$line'");
$XML->concatenate($line . "\n");
}
}
&Debug("Full request is '$buffer'") unless (length($buffer)>100);
unless ( $XML->isA2P ) {
my $abterm = "XML request is not for me" ;
$self->ThisError($abterm);
$XML->A2Pmessage("A2P-ABTERM = $abterm");
$XML->A2Pabterm ;
$self->XMLanswer($clientrequest);
$self->CloseClient($clientrequest);
return 0 ;
}
if ( $XML->getPATH ) {
my $file = $XML->getPATH ;
if ( -e $file ) {
# Ask to do the job
my $request = &GetCom( comASK ,
JobManager => &GetCom( comJOB , $XML->getid , $file ));
$self->Do( \$request ); # This also update jobcount
} else {
my $abterm = "Can't convert not existing '$file'" ;
$self->ThisError($abterm);
$XML->A2Pmessage("A2P-ABTERM = $abterm");
$XML->A2Pabterm ;
$self->XMLanswer($clientrequest);
$self->CloseClient($clientrequest);
return 0 ;
}
} else {
my $abterm = "Mal-formed XML request not supported" ;
$self->ThisError($abterm);
$XML->A2Pmessage("A2P-ABTERM = $abterm");
$XML->A2Pabterm ;
$self->XMLanswer($clientrequest);
$self->CloseClient($clientrequest);
return 0 ;
}
} elsif ($clientrequest) {
&Warn("Closing $clientrequest as max jobs count has been reached");
&Debug("Closing $clientrequest as $jobcount jobcount >= to $MAXJOBS");
$buffer = "" ;
my $XML = $XML{$clientrequest} = new A2P::XML(\$buffer);
$XML->isA2P ;
$XML->A2Pmessage("A2P-ABTERM = MAX JOBS COUNT REACHED ON A2P SERVICE");
$self->XMLanswer($clientrequest);
$self->CloseClient($clientrequest);
} else {
$self->ThisError("[DEV] Unexpected such '$buffer' XML request");
}
}
my $qr_version_event =
qr/Step 2, Processing AFP JOB '([-a-zA-Z0-9._]+)' with a2p v([0-9.]+)/ ;
sub XMLEvent {
my $self = shift ;
my $conn = shift ;
my $event = shift ;
unless (defined($XML{$conn})) {
&Warn("Can't handle '$event' XML event as '$conn' XML is destroyed");
return 0 ;
}
my $XML = $XML{$conn} ;
# Get version and jobname if available
# Pattern: Step 2, Starting AFP JOB '$jobname' with a2p v$version
return $XML->A2Pmessage( "A2P-Version = $2", "A2P-Jobname = $1" )
if ( $event =~ $qr_version_event );
# Log any ABTERM event
if ( $event =~ /abterm:\s*(.*)$/i ) {
$XML->A2Pmessage("A2P-ABTERM = $1");
return $XML->A2Pabterm ;
}
# Get zipfile
return $XML->A2Pmessage("A2P-Zipfile = $1")
if ( $event =~ /ZIPFILE=(.*)$/ );
# Get PDF for each task
# Pattern: Generating PDF '$PDFNAME'
return $XML->A2Ppdf( $1 )
if ( $event =~ /Generating PDF '([-a-zA-Z0-9._]+\.pdf)'/ );
# Finally check state and return answer to client
if ( $event =~ /JOBSTATUS:\s+(\w+)/ ) {
my $state = $1 ;
$XML->A2Pmessage( "A2P-State = $state" );
$XML->setstate( $state );
$self->XMLanswer($conn);
} else {
&Debug("Forgetting info '$event'");
}
}
sub XMLanswer {
# Return answer and remove XML object
my $self = shift ;
my $conn = shift ;
my $Socket = $conn{$conn} ;
my $XML = $XML{$conn} ;
if (defined($Socket) and defined($XML)) {
$Socket->autoflush(1);
print $Socket $XML->A2Panswer(), "\n";
} elsif (!defined($Socket)) {
&Warn("Can't send XML answer on '$conn' connection as it seems lost");
} else {
&Warn("Can't send XML answer on no XML was defined");
}
delete $XML{$conn} ;
}
sub Answer {
my $self = shift ;
my $Socket = $clientrequest ? $conn{$clientrequest} : $self->{DADDY} ;
return 0 unless (defined($Socket));
my @Answer =
&GetCom( comCOM , $$ => ( @_ > 1 ? &GetCom( comREQ , @_ ) : $_[0] ));
my $Ret = $clientrequest ? print $Socket @Answer,"\n" :
$self->PrintSock( $Socket, @Answer )
or &Debug("Listener answer not sent, ".__PACKAGE__.
", l. ".__LINE__." ($!-$?-$@)");
&Debug("Bad listener answer request: @_") if ( $#_ > 1);
return $Ret ;
}
&Debug("Module " . __PACKAGE__ . " v$VERSION loaded");
1;