#!/usr/bin/python # -*- coding: iso-8859-1 -*- # # MailFilter - Mail filter to replace procmail. # Copyright (C) 2004 Frédéric Jolliton # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # # Policy when error are encountered: # # We backup the mail in a special directory. It will be # at the admin discretion to feed it again to this program # (or may be script that.) # # # TODO: # # [ ] Define precisely what return code use for each possible case. # import sys import os import time import email import types import re import confparser from os import EX_USAGE, EX_OK, EX_NOUSER, EX_TEMPFAIL, EX_DATAERR # EX_OK 0 ok # EX_USAGE 64 command line usage error # EX_DATAERR 65 data format error # EX_NOINPUT 66 cannot open input # EX_NOUSER 67 addressee unknown # EX_NOHOST 68 host name unknown # EX_UNAVAILABLE 69 service unavailable # EX_SOFTWARE 70 internal software error # EX_OSERR 71 system error (e.g., can't fork) # EX_OSFILE 72 critical OS file missing # EX_CANTCREAT 73 can't create (user) output file # EX_IOERR 74 input/output error # EX_TEMPFAIL 75 temp failure; user is invited to retry # EX_PROTOCOL 76 remote error in protocol # EX_NOPERM 77 permission denied # EX_CONFIG 78 configuration error # # Path to subprocess module. Ideally not needed if subprocess # (formerly popen5) is installed into /site-packages/ directory. # #sys.path.insert( 0 , '/usr/local/lib/python/' ) # # subprocess (formerly popen5) - See PEP 324 # http://www.lysator.liu.se/~astrand/popen5/ # # >>> cat = subprocess.Popen( 'cat' , stdin = subprocess.PIPE , stdout = subprocess.PIPE ) # >>> cat.communicate( 'bla' ) # ('bla', None) # >>> cat.returncode # 0 # try : import subprocess except ImportError : try : import popen5 as subprocess except ImportError : print 'Please install subprocess module.' print 'See http://www.lysator.liu.se/~astrand/popen5/.' sys.exit( 1 ) #--[ Configuration variables ]------------------------------------------------ # # Filename where to put log. # g_pathLog = '/var/log/mail.filter.log' # # For which users receiving a mail should we send a UDP packet. # g_userNotificationFilter = [ 'fred' ] # # For which IP address should we send the notification. # (Can include broadcast address.) # g_notificationAddresses = [ '192.168.1.255' ] # # On which port should we send the notification. # g_notificationPort = 23978 # # Max mail size to be processed by this script. # # Larger mail are just not filtered. # g_maxMailSize = 2 * 1024 * 1024 # # Where to save copy of mail in case of error. # g_directoryBackup = '/var/mail.filter/recovery/' # # Where to find rules about each user. # # Filename for user 'joe' will be named 'joe.mf' in that # directory. If the file doesn't exist, no filtering is # done (not even spam/virus filtering.) # g_directoryRules = '/var/mail.filter/rules/' #--[ External commands ]------------------------------------------------------ # # Path to Cyrus's deliver binary. # g_pathCyrusDeliver = '/usr/cyrus/bin/deliver' # # Path to spamprobe binary. # g_pathSpamProbe = '/usr/bin/spamprobe' g_pathSpamProbeDb = '/var/spamprobe/db' # # Path to ClamAV binary. # # Could point either to 'clamdscan' or 'clamscan'. # # The first one is *HIGHLY* recommended since # it will use the ClamAV daemon. # g_pathClamdscan = '/usr/bin/clamdscan' #--[ Global variables ]------------------------------------------------------- # # Should the log be also printed on stdout ? # g_copyLogToStdout = False # # Don't actually feed the mail to Cyrus. # g_testMode = False # # The user name of the recipient. # g_user = None # # The current mail as string (as read from stdin.) # g_mailText = None # # The current mail as email.Message.Message object. # g_mail = None #----------------------------------------------------------------------------- # # check if predicate is True for all items in list 'lst'. # def all( lst , predicate ) : for item in lst : if not predicate( item ) : return False return True # # check if predicate is True for at least one item in list 'lst'. # def some( lst , predicate ) : for item in lst : if predicate( item ) : return True return False # # Remove leading and trailing blank, and replace any # blank character sequence by one space character. # def normalizeBlank( s ) : return ' '.join( s.split() ) #----------------------------------------------------------------------------- # # Utility function to return traceback as string from most recent # exception. # def getTraceBack() : import traceback, sys return ''.join( traceback.format_exception( *sys.exc_info() ) ) # # Return (returnCode, stdout, stderr) # def pipe( cmd , input ) : p = subprocess.Popen( cmd , stdin = subprocess.PIPE , stdout = subprocess.PIPE , stderr = subprocess.PIPE ) try : # much faster than passing 'input' to communicate directly.. p.stdin.write( input ) except IOError : pass r = p.communicate() return p.returncode , r[ 0 ] , r[ 1 ] # # Return an ISO-8661 date representation for the UTC # timezone. # # timestamp( 0 ) => 1970-01-01T00:00:00Z # def timestamp() : t = time.gmtime() return '%04d-%02d-%02dT%02d:%02d:%02dZ' % t[ : 6 ] # # Log message 'msg'. # def logMessage( msg ) : if not logMessage.logFile and not g_testMode : # # If log file is not yet open, try to open it. # try : logMessage.logFile = open( g_pathLog , 'a+' ) except : if not g_copyLogToStdout : return msg = msg.splitlines() prefix = timestamp() + ' [%s] ' % os.getpid() # # Output to log file. # if logMessage.logFile : for line in msg : line = prefix + line try : logMessage.logFile.write( line + '\n' ) logMessage.logFile.flush() except : pass # # Output to standard output. # if g_copyLogToStdout : for line in msg : line = prefix + line sys.stdout.write( line + '\n' ) sys.stdout.flush() logMessage.logFile = None # # Make a backup of the mail (in case it's impossible # to store the mail to Cyrus.) # def backup( filenamePrefix = None ) : if g_testMode : logMessage( 'TEST MODE: Backup of the mail requested.' ) return try : # Ensure directory exist import os os.makedirs( g_directoryBackup ) except : pass basename = '' if filenamePrefix : basename += filenamePrefix + '-' # # Append current unix time as suffix # basename += '%.3f' % time.time() fn = g_directoryBackup + '/' + basename try : f = open( fn , 'a+' ) f.write( g_mailText ) f.close() except : logMessage( 'PANIC: Unable to write backup to %s.' % fn ) else : logMessage( 'Message appended to backup directory as `%s\'.' % basename ) #----------------------------------------------------------------------------- class Action : pass class NullAction( Action ) : def __repr__( self ) : return '' class FileToFolderAction( Action ) : def __init__( self , folder ) : self.folder = folder def __repr__( self ) : return '' % ( self.folder , ) class CustomErrorCodeAction( Action ) : def __init__( self , code ) : self.code = code def __repr__( self ) : return '' % ( self.code , ) #----------------------------------------------------------------------------- # # Experimental ! # # Packet payload contains: # # + char( 0 ) + + char( 0 ) # def notifyDeliver( user , folder ) : if user not in g_userNotificationFilter : return try : import socket s = socket.socket( socket.AF_INET , socket.SOCK_DGRAM ) msg = user + chr( 0 ) + folder + chr( 0 ) for address in g_notificationAddresses : s.sendto( msg , ( address , g_notificationPort ) ) except : pass # # Deliver a mail to Cyrus for user 'username' in # folder 'folderName' (or default folder if not # specified.) # def deliverTo( username , folderName = None ) : if not folderName : pseudoFolderName = 'INBOX' folderName = 'user.' + username else : pseudoFolderName = 'INBOX.' + folderName folderName = 'user.' + username + '.' + folderName # # Build the command line for running deliver. # cmd = [ g_pathCyrusDeliver ] cmd += [ '-a' , username ] cmd += [ '-m' , folderName ] if g_testMode : logMessage( 'TEST MODE: Delivering mail in `%s\' requested.' % ( folderName , ) ) logMessage( 'TEST MODE: Command: %r.' % cmd ) return EX_OK try : rc , stdout , stderr = pipe( cmd , g_mailText ) except OSError , e : logMessage( 'Error running `%s\': %s.' % ( cmd[ 0 ] , e[ 1 ] ) ) return EX_TEMPFAIL if rc == EX_OK : logMessage( 'Message delivered in folder `%s\'.' % folderName ) notifyDeliver( username , pseudoFolderName ) else : errorMessage = stdout.rstrip() # # Extract raw error message # # Example of output: # # +user.fred: Message contains invalid header # m = errorMessage.split( ': ' , 1 ) if len( m ) == 2 : m = m[ 1 ] else : m = None rcMsg = '%d' % rc if m == 'Message contains invalid header' : rc = EX_DATAERR rcMsg += '->%d' % rc elif m == 'Mailbox does not exist' : rc = EX_NOUSER rcMsg += '->%d' % rc else : # FIXME: DATAERR ok here ? rc = EX_DATAERR logMessage( 'Refused by Cyrus: [%s] `%s\'.' % ( rcMsg , errorMessage ) ) return rc #--[ Antivirus ]-------------------------------------------------------------- # # Return virus list from the output of ClamAV. # def extractVirusList( clamdOutput ) : res = [] for line in clamdOutput.splitlines() : r = extractVirusList.reClamdVirus.search( line.rstrip() ) if r == None : continue res.append( r.group( 1 ) ) return res extractVirusList.reClamdVirus = re.compile( r'^[^:]+: (\S+) FOUND$' ) # # Check for virus. # # Return True if mail is clean. # def antivirusScan() : cmd = [ g_pathClamdscan , '-' ] if g_testMode : logMessage( 'TEST MODE: Virus scan requested.' ) logMessage( 'TEST MODE: Command: %r.' % cmd ) return True rc , stdout , stderr = pipe( cmd , g_mailText ) output = stderr or '' #logMessage( 'clamdscan returned %s' % rc ) if rc == 2 : raise 'Unable to scan for viruses (%s)' % cmd ok = not rc if not ok : msg = 'Virus found.' viruses = extractVirusList( output ) if viruses : msg += ' [%s]' % ' '.join( viruses ) logMessage( msg ) return ok #--[ Antispam ]--------------------------------------------------------------- # # Check for spam. # # Return True if mail is correct. # def spamScan() : if not g_user : return True cmd = [ g_pathSpamProbe ] cmd += [ '-d' , g_pathSpamProbeDb + '/' + g_user + '/' ] cmd += [ 'receive' ] if g_testMode : logMessage( 'TEST MODE: Spam scan requested.' ) logMessage( 'TEST MODE: Command: %r.' % cmd ) return True rc , stdout , stderr = pipe( cmd , g_mailText ) r = ( stdout or '' ).split() return r[ 0 ] != 'SPAM' #----------------------------------------------------------------------------- def errorNameToErrorCode( code ) : code = code.lower() if code == 'nouser' : return EX_NOUSER elif code == 'tempfail' : return EX_TEMPFAIL elif code == 'dataerr' : return EX_DATAERR else : try : return int( code ) except : return 0 #----------------------------------------------------------------------------- # # FIXME: I think it could be better to cache the parsed # configuration, and also to cache the result of the validator # so that we don't run the test each time this script is run ! # def readUserRules( user ) : filename = g_directoryRules + '/' + user + '.mf' # # Read the configuration. # try : return confparser.readConfiguration( filename ) except OSError , e : pass except Exception , e : logMessage( 'Error in file %r. See option -c to check this file.' % ( filename , ) ) #----------------------------------------------------------------------------- # # Test a match rule against a particular header. # def ruleMatch( header , matchType , text ) : if matchType == 'match' : try : return re.search( text , header , re.I ) != None except : logMessage( 'Error with regex `%s\' from %s\'s user configuration.' % ( text , g_user ) ) return False elif matchType == 'is' : return header.strip().lower() == text.strip().lower() elif matchType == 'contains' : return header.lower().find( text.strip().lower() ) != -1 else : logMessage( 'Unknown match type `%s\' from %s\'s user configuration.' % ( matchType , g_user ) ) return False # # Test rule 'rule' against the mail. # def testRule( rule ) : cmd = rule[ 0 ] if cmd == 'and' : return all( rule[ 2 ] , testRule ) if cmd == 'or' : return some( rule[ 2 ] , testRule ) if cmd == 'not' : return not some( rule[ 2 ] , testRule ) # # Matching a header # if cmd == 'header' : if g_mail == None : return False args = rule[ 1 ] headerName , matchType , text = args headers = map( normalizeBlank , g_mail.get_all( headerName ) or [] ) return some( headers , lambda header : ruleMatch( header , matchType , text ) ) # # Broken mail # if cmd == 'broken' : return g_mail == None # # Infected mail # if cmd == 'infected' : return g_mail != None and not antivirusScan() # # Spam mail # if cmd == 'spam' : return g_mail != None and not spamScan() # # Unknown rule # logMessage( 'Unknown rule name `%s\'.' % ( cmd , ) ) return False #----------------------------------------------------------------------------- # # Find the destination folder for user 'user' according to rules defined for # him/her against the current mail. # # Return an Action. # def checkUserRules( user ) : action = FileToFolderAction( None ) conf = readUserRules( user ) if not conf : logMessage( 'No rules defined for user `%s\'.' % user ) elif not conf[ 2 ] : logMessage( 'Empty rules set or syntax error encountered for user `%s\'.' % user ) else : for item in conf[ 2 ] : actionName , args , subs = item[ : 3 ] if some( subs , testRule ) : if actionName == 'folder' : action = FileToFolderAction( args[ 0 ] ) elif actionName == 'reject' : action = CustomErrorCodeAction( errorNameToErrorCode( args[ 0 ] ) ) else : logMessage( 'Unknown action `%s\'.' % actionName ) break return action #----------------------------------------------------------------------------- # # Read mail from standard input. # def readMail() : global g_mailText # # FIXME: Should we be reading the mail by block, so that # we can at least read and backup a part of the standard input # in case an error occur ? (broken pipe for example) # # If error occur, and since we can't backup the mail, # we ask sendmail to retry later. # try : g_mailText = sys.stdin.read() except : logMessage( getTraceBack() ) sys.exit( EX_TEMPFAIL ) # # Check if the mail is bigger than a predefined amount. # def checkForLargeMail() : if len( g_mailText ) > g_maxMailSize : logMessage( 'Message too big (%s bytes). Not filtering it.' % len( g_mailText ) ) rc = None if g_user : rc = deliverTo( g_user ) if rc != EX_OK : logMessage( 'Unable to deliver it to user `%s\'.' % g_user ) if rc != EX_OK : backup( g_user ) sys.exit( EX_OK ) # # Check if user was specified of command line. # def checkForUser() : # # No user specified. # if not g_user : logMessage( 'No user specified.' ) backup() sys.exit( EX_OK ) # # Parse the mail using email python standard module. # def parseMail() : global g_mail # # Parsing the mail. # try : g_mail = email.message_from_string( g_mailText , strict = False ) except : logMessage( getTraceBack() ) # # Dispatch the mail to the correct folder (deduced from rules.) # # Return an error code. # def dispatchMail() : action = checkUserRules( g_user ) # # If custom error code is returned, stop processing # here (mail is not saved.) # if isinstance( action , CustomErrorCodeAction ) : logMessage( 'Custom exit code is %d.' % r.code ) return action.code # # File the mail into the specified folder. # if isinstance( action , FileToFolderAction ) : # # We got a folder name (or None for default folder.) # folder = action.folder pseudoName = 'INBOX' if folder : pseudoName += '.' + folder # # Try to deliver in the named folder # rc = deliverTo( g_user , folder ) # # If we get an error code, then we deliver the mail to default folder, # except if the error was "data error" or if we already tried to deliver # it to default folder. # if rc not in [ EX_OK , EX_DATAERR ] and folder != None : logMessage( 'Error delivering to folder %s of user `%s\'.' % ( pseudoName , g_user ) ) logMessage( 'Mail will go into default folder.' ) rc = deliverTo( g_user ) # # Check again. # # Here we also handle the case of EX_DATAERR not handled above. # if rc != EX_OK : logMessage( 'Error delivering to default folder of user `%s\'.' % ( g_user , ) ) # # Since it's still not ok, backup the mail. # backup( g_user ) # # All errors code different from "data error" are translated to # "no user" error code. # # FIXME: Why?! # if rc != EX_DATAERR : rc = EX_NOUSER # # FIXME: !!!!! # rc = EX_OK return rc raise Exception( 'Unknown action type' ) # # # def process() : readMail() try : checkForUser() checkForLargeMail() parseMail() return dispatchMail() except : logMessage( getTraceBack() ) return EX_DATAERR #----------------------------------------------------------------------------- def checkConfiguration( filename ) : try : confparser.readConfiguration( filename ) except Exception , e : print e #----------------------------------------------------------------------------- def usage() : print '''Usage: mail.filter [OPTIONS] username < EMAIL -h, --help Print this help. -v, --verbose Verbose mode, output log to stdout. -t, --test Test mode. Don't feed the mail to Cyrus, don't do backup, and don't write anything into log file. -l, --log=FILENAME Set log filename. -r, --rules=DIRECTORY Directory where are located users rules. -c, --check-config=FILENAME Check syntax and structure of configuration file FILENAME. ''' print 'Current paths are:\n' print ' spamprobe : %s' % g_pathSpamProbe print ' clamd : %s' % g_pathClamdscan print ' deliver : %s' % g_pathCyrusDeliver print ' log : %s' % g_pathLog print print 'Current directories are:\n' print ' spamprobedb: %s' % g_pathSpamProbeDb print ' rules : %s' % g_directoryRules print ''' Latest version is available from: arch://arch.intra.tuxee.net/2004/mail-filter Report bugs to .''' def main() : global g_user, g_mail, g_mailText, g_copyLogToStdout, g_pathLog, g_directoryRules , g_testMode #--[ Command line ]------------------------------------------------------- import getopt try : _getopt = getopt.gnu_getopt except : _getopt = getopt.getopt try : options , parameters = \ _getopt( sys.argv[ 1 : ] , 'hvtl:r:c:' , ( 'help' , 'verbose' , 'test' , 'log=' , 'rules=' , 'check-config=' ) ) except getopt.GetoptError , e : myName = sys.argv[ 0 ].split( '/' )[ -1 ] print '%s: %s' % ( myName , e[ 0 ] ) print 'Try `%s --help\' for more information.' % myName sys.exit( 1 ) for option , argument in options : if option in [ '-h' , '--help' ] : usage() sys.exit( 0 ) elif option in [ '-v' , '--verbose' ] : g_copyLogToStdout = True elif option in [ '-t' , '--test' ] : g_testMode = True elif option in [ '-l' , '--log' ] : g_pathLog = argument elif option in [ '-r' , '--rules' ] : g_directoryRules = argument elif option in [ '-c' , '--check-config' ] : checkConfiguration( argument ) sys.exit( 0 ) # # At most one parameter expected. # if len( parameters ) > 1 : # # We just log a error message. We continue to proceed # to not lost the mail ! # logMessage( 'Warning: Expected only one user name.' ) if parameters : g_user = parameters[ 0 ] #--[ Core ]--------------------------------------------------------------- logMessage( 'Running mail.filter for user `%s\'.' % g_user ) return process() if __name__ == '__main__' : sys.exit( main() )