mirror of
https://gitlab.com/simple-nixos-mailserver/nixos-mailserver.git
synced 2025-05-17 08:00:49 +05:00
257 lines
7.9 KiB
Python
257 lines
7.9 KiB
Python
import argparse
|
|
import email
|
|
import email.utils
|
|
import imaplib
|
|
import smtplib
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import cast
|
|
|
|
RETRY = 100
|
|
|
|
|
|
def _send_mail(
|
|
smtp_host, smtp_port, smtp_username, from_addr, from_pwd, to_addr, subject, starttls
|
|
):
|
|
print(f"Sending mail with subject '{subject}'")
|
|
message = "\n".join(
|
|
[
|
|
f"From: {from_addr}",
|
|
f"To: {to_addr}",
|
|
f"Subject: {subject}",
|
|
f"Message-ID: {uuid.uuid4()}@mail-check.py",
|
|
f"Date: {email.utils.formatdate()}",
|
|
"",
|
|
"This validates our mail server can send to Gmail :/",
|
|
]
|
|
)
|
|
|
|
retry = RETRY
|
|
while True:
|
|
try:
|
|
with smtplib.SMTP(smtp_host, port=smtp_port) as smtp:
|
|
try:
|
|
if starttls:
|
|
smtp.starttls()
|
|
if from_pwd is not None:
|
|
smtp.login(smtp_username or from_addr, from_pwd)
|
|
|
|
smtp.sendmail(from_addr, [to_addr], message)
|
|
return
|
|
except smtplib.SMTPResponseException as e:
|
|
if e.smtp_code == 451: # service unavailable error
|
|
print(e)
|
|
elif (
|
|
e.smtp_code == 454
|
|
): # smtplib.SMTPResponseException: (454, b'4.3.0 Try again later')
|
|
print(e)
|
|
else:
|
|
raise
|
|
except OSError as e:
|
|
if e.errno in [16, -2]:
|
|
print("OSError exception message: ", e)
|
|
else:
|
|
raise
|
|
|
|
if retry > 0:
|
|
retry = retry - 1
|
|
time.sleep(1)
|
|
print("Retrying")
|
|
else:
|
|
print("Retry attempts exhausted")
|
|
exit(5)
|
|
|
|
|
|
def _read_mail(
|
|
imap_host,
|
|
imap_port,
|
|
imap_username,
|
|
to_pwd,
|
|
subject,
|
|
ignore_dkim_spf,
|
|
show_body=False,
|
|
delete=True,
|
|
):
|
|
print("Reading mail from {imap_username}")
|
|
|
|
message = None
|
|
|
|
obj = imaplib.IMAP4_SSL(imap_host, imap_port)
|
|
obj.login(imap_username, to_pwd)
|
|
obj.select()
|
|
|
|
today = datetime.today()
|
|
cutoff = today - timedelta(days=1)
|
|
dt = cutoff.strftime("%d-%b-%Y")
|
|
for _ in range(0, RETRY):
|
|
print("Retrying")
|
|
obj.select()
|
|
_, data = obj.search(None, f'(SINCE {dt}) (SUBJECT "{subject}")')
|
|
if data == [b""]:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
uids = data[0].decode("utf-8").split(" ")
|
|
if len(uids) != 1:
|
|
print(
|
|
f"Warning: {len(uids)} messages have been found with subject containing {subject}"
|
|
)
|
|
|
|
# FIXME: we only consider the first matching message...
|
|
uid = uids[0]
|
|
_, raw = obj.fetch(uid, "(RFC822)")
|
|
if delete:
|
|
obj.store(uid, "+FLAGS", "\\Deleted")
|
|
obj.expunge()
|
|
assert raw[0] and raw[0][1]
|
|
message = email.message_from_bytes(cast(bytes, raw[0][1]))
|
|
print(f"Message with subject '{message['subject']}' has been found")
|
|
if show_body:
|
|
if message.is_multipart():
|
|
for part in message.walk():
|
|
ctype = part.get_content_type()
|
|
if ctype == "text/plain":
|
|
body = cast(bytes, part.get_payload(decode=True)).decode()
|
|
print(f"Body:\n{body}")
|
|
else:
|
|
print(f"Body with content type {ctype} not printed")
|
|
else:
|
|
body = cast(bytes, message.get_payload(decode=True)).decode()
|
|
print(f"Body:\n{body}")
|
|
break
|
|
|
|
if message is None:
|
|
print(
|
|
f"Error: no message with subject '{subject}' has been found in INBOX of {imap_username}"
|
|
)
|
|
exit(1)
|
|
|
|
if ignore_dkim_spf:
|
|
return
|
|
|
|
# gmail set this standardized header
|
|
if "ARC-Authentication-Results" in message:
|
|
if "dkim=pass" in message["ARC-Authentication-Results"]:
|
|
print("DKIM ok")
|
|
else:
|
|
print("Error: no DKIM validation found in message:")
|
|
print(message.as_string())
|
|
exit(2)
|
|
if "spf=pass" in message["ARC-Authentication-Results"]:
|
|
print("SPF ok")
|
|
else:
|
|
print("Error: no SPF validation found in message:")
|
|
print(message.as_string())
|
|
exit(3)
|
|
else:
|
|
print("DKIM and SPF verification failed")
|
|
exit(4)
|
|
|
|
|
|
def send_and_read(args):
|
|
src_pwd = None
|
|
if args.src_password_file is not None:
|
|
src_pwd = args.src_password_file.readline().rstrip()
|
|
dst_pwd = args.dst_password_file.readline().rstrip()
|
|
|
|
if args.imap_username != "":
|
|
imap_username = args.imap_username
|
|
else:
|
|
imap_username = args.to_addr
|
|
|
|
subject = f"{uuid.uuid4()}"
|
|
|
|
_send_mail(
|
|
smtp_host=args.smtp_host,
|
|
smtp_port=args.smtp_port,
|
|
smtp_username=args.smtp_username,
|
|
from_addr=args.from_addr,
|
|
from_pwd=src_pwd,
|
|
to_addr=args.to_addr,
|
|
subject=subject,
|
|
starttls=args.smtp_starttls,
|
|
)
|
|
|
|
_read_mail(
|
|
imap_host=args.imap_host,
|
|
imap_port=args.imap_port,
|
|
imap_username=imap_username,
|
|
to_pwd=dst_pwd,
|
|
subject=subject,
|
|
ignore_dkim_spf=args.ignore_dkim_spf,
|
|
)
|
|
|
|
|
|
def read(args):
|
|
_read_mail(
|
|
imap_host=args.imap_host,
|
|
imap_port=args.imap_port,
|
|
imap_username=args.imap_username,
|
|
to_pwd=args.imap_password,
|
|
subject=args.subject,
|
|
ignore_dkim_spf=args.ignore_dkim_spf,
|
|
show_body=args.show_body,
|
|
delete=False,
|
|
)
|
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers()
|
|
|
|
parser_send_and_read = subparsers.add_parser(
|
|
"send-and-read",
|
|
description="Send a email with a subject containing a random UUID and then try to read this email from the recipient INBOX.",
|
|
)
|
|
parser_send_and_read.add_argument("--smtp-host", type=str)
|
|
parser_send_and_read.add_argument("--smtp-port", type=str, default=25)
|
|
parser_send_and_read.add_argument("--smtp-starttls", action="store_true")
|
|
parser_send_and_read.add_argument(
|
|
"--smtp-username",
|
|
type=str,
|
|
default="",
|
|
help="username used for smtp login. If not specified, the from-addr value is used",
|
|
)
|
|
parser_send_and_read.add_argument("--from-addr", type=str)
|
|
parser_send_and_read.add_argument("--imap-host", required=True, type=str)
|
|
parser_send_and_read.add_argument("--imap-port", type=str, default=993)
|
|
parser_send_and_read.add_argument("--to-addr", type=str, required=True)
|
|
parser_send_and_read.add_argument(
|
|
"--imap-username",
|
|
type=str,
|
|
default="",
|
|
help="username used for imap login. If not specified, the to-addr value is used",
|
|
)
|
|
parser_send_and_read.add_argument("--src-password-file", type=argparse.FileType("r"))
|
|
parser_send_and_read.add_argument(
|
|
"--dst-password-file", required=True, type=argparse.FileType("r")
|
|
)
|
|
parser_send_and_read.add_argument(
|
|
"--ignore-dkim-spf",
|
|
action="store_true",
|
|
help="to ignore the dkim and spf verification on the read mail",
|
|
)
|
|
parser_send_and_read.set_defaults(func=send_and_read)
|
|
|
|
parser_read = subparsers.add_parser(
|
|
"read",
|
|
description="Search for an email with a subject containing 'subject' in the INBOX.",
|
|
)
|
|
parser_read.add_argument("--imap-host", type=str, default="localhost")
|
|
parser_read.add_argument("--imap-port", type=str, default=993)
|
|
parser_read.add_argument("--imap-username", required=True, type=str)
|
|
parser_read.add_argument("--imap-password", required=True, type=str)
|
|
parser_read.add_argument(
|
|
"--ignore-dkim-spf",
|
|
action="store_true",
|
|
help="to ignore the dkim and spf verification on the read mail",
|
|
)
|
|
parser_read.add_argument(
|
|
"--show-body", action="store_true", help="print mail text/plain payload"
|
|
)
|
|
parser_read.add_argument("subject", type=str)
|
|
parser_read.set_defaults(func=read)
|
|
|
|
args = parser.parse_args()
|
|
args.func(args)
|