nixos-mailserver/scripts/mail-check.py

204 lines
7.5 KiB
Python
Raw Normal View History

import smtplib, sys
import argparse
import os
import uuid
import imaplib
from datetime import datetime, timedelta
import email
import email.utils
import time
RETRY = 100
def _send_mail(smtp_host, smtp_port, smtp_username, from_addr, from_pwd, to_addr, subject, starttls):
print("Sending mail with subject '{}'".format(subject))
message = "\n".join([
"From: {from_addr}",
"To: {to_addr}",
"Subject: {subject}",
"Message-ID: {random}@mail-check.py",
"Date: {date}",
"",
"This validates our mail server can send to Gmail :/"]).format(
from_addr=from_addr,
to_addr=to_addr,
subject=subject,
random=str(uuid.uuid4()),
date=email.utils.formatdate(),
)
retry = RETRY
while True:
try:
with smtplib.SMTP(smtp_host, port=smtp_port) as smtp:
try:
if starttls:
smtp.starttls()
if from_pwd is not None:
smtp.login(smtp_username or from_addr, from_pwd)
smtp.sendmail(from_addr, [to_addr], message)
return
except smtplib.SMTPResponseException as e:
if e.smtp_code == 451: # service unavailable error
print(e)
elif e.smtp_code == 454: # smtplib.SMTPResponseException: (454, b'4.3.0 Try again later')
print(e)
else:
raise
except OSError as e:
if e.errno in [16, -2]:
print("OSError exception message: ", e)
else:
raise
if retry > 0:
retry = retry - 1
time.sleep(1)
print("Retrying")
else:
print("Retry attempts exhausted")
exit(5)
def _read_mail(
imap_host,
imap_port,
imap_username,
to_pwd,
subject,
ignore_dkim_spf,
show_body=False,
delete=True):
print("Reading mail from %s" % imap_username)
message = None
obj = imaplib.IMAP4_SSL(imap_host, imap_port)
obj.login(imap_username, to_pwd)
obj.select()
today = datetime.today()
cutoff = today - timedelta(days=1)
dt = cutoff.strftime('%d-%b-%Y')
for _ in range(0, RETRY):
print("Retrying")
obj.select()
typ, data = obj.search(None, '(SINCE %s) (SUBJECT "%s")'%(dt, subject))
if data == [b'']:
time.sleep(1)
continue
uids = data[0].decode("utf-8").split(" ")
if len(uids) != 1:
print("Warning: %d messages have been found with subject containing %s " % (len(uids), subject))
# FIXME: we only consider the first matching message...
uid = uids[0]
_, raw = obj.fetch(uid, '(RFC822)')
if delete:
obj.store(uid, '+FLAGS', '\\Deleted')
obj.expunge()
message = email.message_from_bytes(raw[0][1])
print("Message with subject '%s' has been found" % message['subject'])
if show_body:
for m in message.get_payload():
if m.get_content_type() == 'text/plain':
print("Body:\n%s" % m.get_payload(decode=True).decode('utf-8'))
break
if message is None:
print("Error: no message with subject '%s' has been found in INBOX of %s" % (subject, imap_username))
exit(1)
if ignore_dkim_spf:
return
# gmail set this standardized header
if 'ARC-Authentication-Results' in message:
if "dkim=pass" in message['ARC-Authentication-Results']:
print("DKIM ok")
else:
print("Error: no DKIM validation found in message:")
print(message.as_string())
exit(2)
if "spf=pass" in message['ARC-Authentication-Results']:
print("SPF ok")
else:
print("Error: no SPF validation found in message:")
print(message.as_string())
exit(3)
else:
print("DKIM and SPF verification failed")
exit(4)
def send_and_read(args):
src_pwd = None
if args.src_password_file is not None:
src_pwd = args.src_password_file.readline().rstrip()
dst_pwd = args.dst_password_file.readline().rstrip()
if args.imap_username != '':
imap_username = args.imap_username
else:
imap_username = args.to_addr
subject = "{}".format(uuid.uuid4())
_send_mail(smtp_host=args.smtp_host,
smtp_port=args.smtp_port,
smtp_username=args.smtp_username,
from_addr=args.from_addr,
from_pwd=src_pwd,
to_addr=args.to_addr,
subject=subject,
starttls=args.smtp_starttls)
_read_mail(imap_host=args.imap_host,
imap_port=args.imap_port,
imap_username=imap_username,
to_pwd=dst_pwd,
subject=subject,
ignore_dkim_spf=args.ignore_dkim_spf)
def read(args):
_read_mail(imap_host=args.imap_host,
imap_port=args.imap_port,
to_addr=args.imap_username,
to_pwd=args.imap_password,
subject=args.subject,
ignore_dkim_spf=args.ignore_dkim_spf,
show_body=args.show_body,
delete=False)
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
parser_send_and_read = subparsers.add_parser('send-and-read', description="Send a email with a subject containing a random UUID and then try to read this email from the recipient INBOX.")
parser_send_and_read.add_argument('--smtp-host', type=str)
parser_send_and_read.add_argument('--smtp-port', type=str, default=25)
parser_send_and_read.add_argument('--smtp-starttls', action='store_true')
parser_send_and_read.add_argument('--smtp-username', type=str, default='', help="username used for smtp login. If not specified, the from-addr value is used")
parser_send_and_read.add_argument('--from-addr', type=str)
parser_send_and_read.add_argument('--imap-host', required=True, type=str)
parser_send_and_read.add_argument('--imap-port', type=str, default=993)
parser_send_and_read.add_argument('--to-addr', type=str, required=True)
parser_send_and_read.add_argument('--imap-username', type=str, default='', help="username used for imap login. If not specified, the to-addr value is used")
parser_send_and_read.add_argument('--src-password-file', type=argparse.FileType('r'))
parser_send_and_read.add_argument('--dst-password-file', required=True, type=argparse.FileType('r'))
parser_send_and_read.add_argument('--ignore-dkim-spf', action='store_true', help="to ignore the dkim and spf verification on the read mail")
parser_send_and_read.set_defaults(func=send_and_read)
parser_read = subparsers.add_parser('read', description="Search for an email with a subject containing 'subject' in the INBOX.")
parser_read.add_argument('--imap-host', type=str, default="localhost")
parser_read.add_argument('--imap-port', type=str, default=993)
parser_read.add_argument('--imap-username', required=True, type=str)
parser_read.add_argument('--imap-password', required=True, type=str)
parser_read.add_argument('--ignore-dkim-spf', action='store_true', help="to ignore the dkim and spf verification on the read mail")
parser_read.add_argument('--show-body', action='store_true', help="print mail text/plain payload")
parser_read.add_argument('subject', type=str)
parser_read.set_defaults(func=read)
args = parser.parse_args()
args.func(args)