1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
|
import difflib
from .parser import ArchivesParser
from lib.log import log, opstatus
class ArchivesParserStorage(ArchivesParser):
def __init__(self):
super(ArchivesParserStorage, self).__init__()
self.purges = set()
def purge_list(self, listid, year, month):
self.purges.add((int(listid), int(year), int(month)))
def purge_thread(self, threadid):
self.purges.add(int(threadid))
def store(self, conn, listid, overwrite=False, overwrite_raw=False):
curs = conn.cursor()
# Potentially add the information that there exists a mail for
# this month. We do that this early since we're always going to
# make the check anyway, and this keeps the code in one place..
if not overwrite:
curs.execute("INSERT INTO list_months (listid, year, month) SELECT %(listid)s, %(year)s, %(month)s WHERE NOT EXISTS (SELECT listid FROM list_months WHERE listid=%(listid)s AND year=%(year)s AND month=%(month)s)", {
'listid': listid,
'year': self.date.year,
'month': self.date.month,
})
curs.execute("SELECT threadid, EXISTS(SELECT threadid FROM list_threads lt WHERE lt.listid=%(listid)s AND lt.threadid=m.threadid), id FROM messages m WHERE m.messageid=%(messageid)s", {
'messageid': self.msgid,
'listid': listid,
})
r = curs.fetchall()
if len(r) > 0:
# Has to be 1 row, since we have a unique index on id
if not r[0][1] and not overwrite:
log.status("Tagging message %s with list %s" % (self.msgid, listid))
curs.execute("INSERT INTO list_threads (threadid, listid) VALUES (%(threadid)s, %(listid)s)", {
'threadid': r[0][0],
'listid': listid,
})
opstatus.tagged += 1
self.purge_list(listid, self.date.year, self.date.month)
self.purge_thread(r[0][0])
else:
opstatus.dupes += 1
if overwrite:
pk = r[0][2]
self.purge_thread(r[0][0])
if overwrite_raw:
# For full overwrite, we also update the raw text of the message. This is an
# uncommon enough operation that we'll just do it as a separate command.
log.status("Full raw overwrite of %s" % self.msgid)
curs.execute("UPDATE messages SET rawtxt=%(raw)s WHERE id=%(id)s", {
'raw': bytearray(self.rawtxt),
'id': pk,
})
# Overwrite an existing message. We do not attempt to
# "re-thread" a message, we just update the contents. We
# do remove all attachments and rewrite them. Of course, we
# don't change the messageid (since it's our primary
# identifyer), and we don't update the raw text of the message.
# (since we are expected to have used that raw text to do
# the re-parsing initially)
# We update bodytext as a separate step so as not to rewrite
# the TOAST table unnecessarily...
curs.execute("UPDATE messages SET bodytxt=%(bodytxt)s WHERE id=%(id)s AND NOT (bodytxt=%(bodytxt)s) RETURNING id", {
'id': pk,
'bodytxt': self.bodytxt,
})
rc = curs.rowcount
curs.execute("UPDATE messages SET _from=%(from)s, _to=%(to)s, cc=%(cc)s, subject=%(subject)s, date=%(date)s, has_attachment=%(has_attachment)s WHERE id=%(id)s AND NOT (_from=%(from)s AND _to=%(to)s AND cc=%(cc)s AND subject=%(subject)s AND date=%(date)s AND has_attachment=%(has_attachment)s) RETURNING id", {
'id': pk,
'from': self._from,
'to': self.to or '',
'cc': self.cc or '',
'subject': self.subject or '',
'date': self.date,
'has_attachment': len(self.attachments) > 0,
})
rc += curs.rowcount
if rc == 0:
log.status("Message %s unchanged" % self.msgid)
return False
curs.execute("DELETE FROM attachments WHERE message=%(message)s", {
'message': pk,
})
if len(self.attachments):
curs.executemany("INSERT INTO attachments (message, filename, contenttype, attachment) VALUES (%(message)s, %(filename)s, %(contenttype)s, %(attachment)s)", [{
'message': pk,
'filename': a[0] or 'unknown_filename',
'contenttype': a[1],
'attachment': bytearray(a[2]),
} for a in self.attachments])
opstatus.overwritten += 1
log.status("Message %s overwritten" % self.msgid)
else:
log.status("Message %s already stored" % self.msgid)
return True
if overwrite:
raise Exception("Attempt to overwrite message (%s) that doesn't exist on list %s!" % (self.msgid, listid))
# Always purge the primary list for this thread
self.purge_list(listid, self.date.year, self.date.month)
# Resolve own thread
curs.execute("SELECT id, messageid, threadid FROM messages WHERE messageid=ANY(%(parents)s)", {
'parents': self.parents,
})
all_parents = curs.fetchall()
if len(all_parents):
# At least one of the parents exist. Now try to figure out which one
best_parent = len(self.parents) + 1
best_threadid = -1
best_parentid = None
for i in range(0, len(all_parents)):
for j in range(0, len(self.parents)):
if self.parents[j] == all_parents[i][1]:
# This messageid found. Better than the last one?
if j < best_parent:
best_parent = j
best_parentid = all_parents[i][0]
best_threadid = all_parents[i][2]
if best_threadid == -1:
raise Exception("Message %s, resolve failed in a way it shouldn't :P" % self.msgid)
self.parentid = best_parentid
self.threadid = best_threadid
# Slice away all matches that are worse than the one we wanted
self.parents = self.parents[:best_parent]
log.status("Message %s resolved to existing thread %s, waiting for %s better messages" % (self.msgid, self.threadid, len(self.parents)))
else:
# No parent exist. But don't create the threadid just yet, since
# it's possible that we're somebody elses parent!
self.parentid = None
self.threadid = None
# Now see if we are somebody elses *parent*...
curs.execute("SELECT message, priority, threadid FROM unresolved_messages INNER JOIN messages ON messages.id=unresolved_messages.message WHERE unresolved_messages.msgid=%(msgid)s ORDER BY threadid", {
'msgid': self.msgid,
})
childrows = curs.fetchall()
if len(childrows):
# We are some already existing message's parent (meaning the
# messages arrived out of order)
# In the best case, the threadid is the same for all threads.
# But it might be different if this it the "glue message" that's
# holding other threads together.
if self.threadid:
# Already have a threadid, means that we have a glue message
print("Message %s resolved to existing thread %s, while being somebodys parent" % (self.msgid, self.threadid))
else:
print("Message %s did not resolve to existing thread, but is somebodys parent" % self.msgid)
# In this case, just pick the first thread from the list and merge into that
# one.
self.threadid = childrows[0][2]
# Get a unique list (set) of all threads *except* the primary one,
# because we'll be merging into that one.
mergethreads = set([r[2] for r in childrows]).difference(set((self.threadid,)))
if len(mergethreads):
# We have one or more merge threads
log.status("Merging threads %s into thread %s" % (",".join(str(s) for s in mergethreads), self.threadid))
curs.execute("UPDATE messages SET threadid=%(threadid)s WHERE threadid=ANY(%(oldthreadids)s)", {
'threadid': self.threadid,
'oldthreadids': list(mergethreads),
})
# Insert any lists that were tagged on the merged threads
curs.execute("INSERT INTO list_threads (threadid, listid) SELECT DISTINCT %(threadid)s,listid FROM list_threads lt2 WHERE lt2.threadid=ANY(%(oldthreadids)s) AND listid NOT IN (SELECT listid FROM list_threads lt3 WHERE lt3.threadid=%(threadid)s)", {
'threadid': self.threadid,
'oldthreadids': list(mergethreads),
})
# Remove all old leftovers
curs.execute("DELETE FROM list_threads WHERE threadid=ANY(%(oldthreadids)s)", {
'oldthreadids': list(mergethreads),
})
# Purge varnish records for all the threads we just removed
for t in mergethreads:
self.purge_thread(t)
# Batch all the children for repointing. We can't do the actual
# repointing until later, since we don't know our own id yet.
self.children = [r[0] for r in childrows]
log.status("Children set to %s with mergethreads being %s (from childrows %s and threadid %s)" % (
self.children, mergethreads, childrows, self.threadid)
)
# Finally, remove all the pending messages that had a higher
# priority value (meaning less important) than us
curs.executemany("DELETE FROM unresolved_messages WHERE message=%(msg)s AND priority >= %(prio)s", [{
'msg': msg,
'prio': prio,
} for msg, prio, tid in childrows])
else:
self.children = []
if not self.threadid:
# No parent and no child exists - create a new threadid, just for us!
curs.execute("SELECT nextval('threadid_seq')")
self.threadid = curs.fetchall()[0][0]
log.status("Message %s resolved to no parent (out of %s) and no child, new thread %s" % (self.msgid, len(self.parents), self.threadid))
else:
# We have a threadid already, so we're not a new thread. Thus,
# we need to purge the old thread
self.purge_thread(self.threadid)
# Insert a thread tag if we're on a new list
curs.execute("INSERT INTO list_threads (threadid, listid) SELECT %(threadid)s, %(listid)s WHERE NOT EXISTS (SELECT * FROM list_threads t2 WHERE t2.threadid=%(threadid)s AND t2.listid=%(listid)s) RETURNING threadid", {
'threadid': self.threadid,
'listid': listid,
})
if len(curs.fetchall()):
log.status("Tagged thread %s with listid %s" % (self.threadid, listid))
curs.execute("INSERT INTO messages (parentid, threadid, _from, _to, cc, subject, date, has_attachment, messageid, bodytxt, rawtxt) VALUES (%(parentid)s, %(threadid)s, %(from)s, %(to)s, %(cc)s, %(subject)s, %(date)s, %(has_attachment)s, %(messageid)s, %(bodytxt)s, %(rawtxt)s) RETURNING id", {
'parentid': self.parentid,
'threadid': self.threadid,
'from': self._from,
'to': self.to or '',
'cc': self.cc or '',
'subject': self.subject or '',
'date': self.date,
'has_attachment': len(self.attachments) > 0,
'messageid': self.msgid,
'bodytxt': self.bodytxt,
'rawtxt': bytearray(self.rawtxt),
})
id = curs.fetchall()[0][0]
log.status("Message %s, got id %s, set thread %s, parent %s" % (
self.msgid, id, self.threadid, self.parentid))
if len(self.attachments):
# Insert attachments
curs.executemany("INSERT INTO attachments (message, filename, contenttype, attachment) VALUES (%(message)s, %(filename)s, %(contenttype)s, %(attachment)s)", [{
'message': id,
'filename': a[0] or 'unknown_filename',
'contenttype': a[1],
'attachment': bytearray(a[2]),
} for a in self.attachments])
if len(self.children):
log.status("Setting %s other messages to children of %s" % (len(self.children), self.msgid))
curs.executemany("UPDATE messages SET parentid=%(parent)s WHERE id=%(id)s",
[{'parent': id, 'id': c} for c in self.children])
if len(self.parents):
# There are remaining parents we'd rather have to get ourselves
# properly threaded - so store them in the db.
curs.executemany("INSERT INTO unresolved_messages (message, priority, msgid) VALUES (%(id)s, %(priority)s, %(msgid)s)",
[{'id': id, 'priority': i, 'msgid': self.parents[i]} for i in range(0, len(self.parents))])
opstatus.stored += 1
return True
def diff(self, conn, f, fromonlyf, oldid):
curs = conn.cursor()
# Fetch the old one so we have something to diff against
curs.execute("SELECT id, _from, _to, cc, subject, date, has_attachment, bodytxt FROM messages WHERE messageid=%(msgid)s", {
'msgid': self.msgid,
})
try:
id, _from, to, cc, subject, date, has_attachment, bodytxt = curs.fetchone()
except TypeError as e:
f.write("---- %s ----\n" % self.msgid)
f.write("Could not re-find in archives (old id was %s): %s\n" % (oldid, e))
f.write("\n-------------------------------\n\n")
return
if (_from.rstrip(), to.rstrip(), cc.rstrip(), subject.rstrip(), date) != (self._from, self.to, self.cc, self.subject, self.date):
log.status("Message %s has header changes " % self.msgid)
f.write("==== %s ====\n" % self.msgid)
for fn in ['_from', 'to', 'cc', 'subject', 'date']:
if getattr(self, fn) != eval(fn):
s = "- {0}: {1}\n".format(fn, eval(fn))
d = "+ {0}: {1}\n".format(fn, getattr(self, fn))
f.write(s)
f.write(d)
f.write("\n\n")
if bodytxt != self.bodytxt:
log.status("Message %s has body changes " % self.msgid)
tempdiff = list(difflib.unified_diff(bodytxt.splitlines(),
self.bodytxt.splitlines(),
fromfile='old',
tofile='new',
n=0,
lineterm=''))
if (len(tempdiff) - 2) % 3 == 0:
# 3 rows to a diff, two header rows.
# Then verify that each slice of 3 contains one @@ row (header), one -From and one +>From,
# which indicates the only change is in the From.
ok = True
tempdiff = tempdiff[2:]
while tempdiff:
a, b, c = (tempdiff.pop(0), tempdiff.pop(0), tempdiff.pop(0))
if not (a.startswith('@@ ') and b.startswith('-From ') and c.startswith('+>From ')):
ok = False
break
if ok:
fromonlyf.write("%s\n" % self.msgid)
return
# Generate a nicer diff
d = list(difflib.unified_diff(bodytxt.splitlines(),
self.bodytxt.splitlines(),
fromfile='old',
tofile='new',
n=0,
lineterm=''))
if len(d) > 0:
f.write("---- %s ----\n" % self.msgid)
f.write("\n".join(d))
f.write("\n\n")
else:
log.status("Message %s unchanged." % self.msgid)
|