summaryrefslogtreecommitdiff
path: root/resender/archives_resender.py
blob: 061ea16feb00fc3c334989a55d608b00ea791b6b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/python3 -u
#
# archives_resender.py - resend messages to authenticated users
#
# This script is intended to be run as a daemon.
#


import os
import sys
import select
import smtplib
from configparser import ConfigParser
import psycopg2


def process_queue(conn, sender, smtpserver, heloname):
    with conn.cursor() as curs:
        curs.execute("SELECT r.id, u.email, m.rawtxt FROM mailarchives_resendmessage r INNER JOIN auth_user u ON u.id=r.sendto_id INNER JOIN messages m ON m.id=r.message_id ORDER BY r.id FOR UPDATE OF r LIMIT 1")
        ll = curs.fetchall()
        if len(ll) == 0:
            conn.rollback()
            return False

        recipient = ll[0][1]
        contents = ll[0][2]
        if contents[0:5].tobytes() == b'From ':
            # If the first line is the From header, strip it out before sending anything
            # Try ot find the end of it. We'll look for a 1k long line, and if that fails,
            # we just give up.
            first = contents[:1024].tobytes()
            ofs = first.find(b'\n')
            if ofs < 0:
                raise Exception("Found start of From line but could not find end of it. Failing to send email to {0}!".format(recipient))

            contents = contents[ofs + 1:]

        try:
            # Actually resend! New SMTP connection for each message because we're not sending
            # that many.
            smtp = smtplib.SMTP(smtpserver, local_hostname=heloname)
            smtp.sendmail(sender, recipient, contents)
            smtp.close()
        except Exception as e:
            sys.stderr.write("Error sending email to {0}: {1}\n".format(recipient, e))

            # Fall through and just delete the email, we never make more than one attempt

        curs.execute("DELETE FROM mailarchives_resendmessage WHERE id=%(id)s", {
            'id': ll[0][0],
        })
        conn.commit()
        return True


if __name__ == "__main__":
    cfg = ConfigParser()
    cfg.read(os.path.join(os.path.realpath(os.path.dirname(sys.argv[0])), '../loader', 'archives.ini'))
    if not cfg.has_option('smtp', 'server'):
        print("Must specify server under smtp in configuration")
        sys.exit(1)
    if not cfg.has_option('smtp', 'heloname'):
        print("Must specify heloname under smtp in configuration")
        sys.exit(1)
    if not cfg.has_option('smtp', 'resender'):
        print("Must specify resender under smtp in configuration")
        sys.exit(1)

    smtpserver = cfg.get('smtp', 'server')
    heloname = cfg.get('smtp', 'heloname')
    sender = cfg.get('smtp', 'resender')

    conn = psycopg2.connect(cfg.get('db', 'connstr') + ' application_name=archives_resender')

    curs = conn.cursor()

    curs.execute("LISTEN archives_resend")
    conn.commit()

    while True:
        # Process everything in the queue now
        while True:
            if not process_queue(conn, sender, smtpserver, heloname):
                break

        # Wait for a NOTIFY. Poll every 5 minutes.
        select.select([conn], [], [], 5 * 60)

        # Eat up all notifications, since we're just going to process
        # all pending messages until the queue is empty.
        conn.poll()
        while conn.notifies:
            conn.notifies.pop()