summaryrefslogtreecommitdiff
path: root/postgresqleu/util/payment/bankfile.py
blob: 43459b83f50399acd6593bffd5ca9f3b987b563a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3

import sys
import json
import csv
import datetime
from decimal import Decimal


class BankFileParser(object):
    MANDATORY_ATTRIBUTES = ['date', 'amount', 'text']

    def __init__(self, spec):
        self.spec = spec

    def parse(self, contents):
        if self.spec['type'] == 'delimited':
            yield from self.parse_delimited(contents)
        else:
            raise Exception("Unknown type %s" % self.spec['type'])

    def parse_delimited(self, contents):
        reader = csv.reader(contents.splitlines(), delimiter=self.spec['delimiter'])

        toskip = self.spec.get('skiprows', 0)
        if self.spec['firstisheader']:
            foundheader = False
        else:
            # Pretend like it's already found
            foundheader = True

        for row in reader:
            if toskip > 0:
                toskip -= 1
                continue

            if not foundheader:
                # This is the header. If we're not supposed to validate it, just skip, but
                # otherwise validate each column.
                if self.spec['validateheader']:
                    if len(self.spec['columns']) != len(row):
                        raise Exception("Found {} columns in header, expected {}".format(len(row), len(self.spec['columns'])))
                    for col, header in zip(self.spec['columns'], row):
                        if header not in col['header']:
                            raise Exception("Column {} in file was supposed to be one of {}".format(header, col['header']))

                foundheader = True
                continue

            if not row:
                # Completely empty row?
                continue

            # Now parse the actual data
            obj = {
                'other': {},
                'validate': {},
            }
            for col, val in zip(self.spec['columns'], row):
                if col['function'] == 'ignore':
                    continue
                elif col['function'] == 'uniqueid':
                    obj['uniqueid'] = str(self.parse_value(col, val))
                elif col['function'] == 'date':
                    obj['date'] = self.parse_value(col, val, 'date')
                elif col['function'] == 'text':
                    obj['text'] = str(self.parse_value(col, val))
                elif col['function'] == 'amount':
                    obj['amount'] = self.parse_value(col, val, 'decimal')
                elif col['function'] == 'balance':
                    obj['balance'] = self.parse_value(col, val, 'decimal')
                elif col['function'] == 'validate':
                    obj['validate'][col['header'][0].lower()] = {
                        'val': str(self.parse_value(col, val)),
                        'validate': col['validate'].lower(),
                    }
                    # We also store the validated values, for possible future
                    # needs.
                    obj['other'][col['header'][0].lower()] = self.parse_value(col, val)
                elif col['function'] == 'store':
                    obj['other'][col['header'][0].lower()] = self.parse_value(col, val)
                else:
                    raise Exception("Unknown column function {}".format(col['function']))

            for a in self.MANDATORY_ATTRIBUTES:
                if a not in obj:
                    raise Exception("Mandatory attribute {} not found".format(a))

            if int(self.spec.get('delayincomingdays', '0')) > 0:
                # Any *incoming* transactions are delayed for <n> days before they are
                # loaded, to handle some banks that initially give partial information
                # about the transactions and backfill it later with no other changes.
                if obj['amount'] > 0:
                    if (datetime.datetime.today().date() - obj['date']).days < int(self.spec['delayincomingdays']):
                        # Just ignore the row, don't report it as error
                        continue

            yield obj

    def parse_value(self, col, val, mustbeformat=None):
        if mustbeformat and col.get('format', '**unknown**') != mustbeformat:
            raise Exception("Column {} must be format {}".format(col['header'][0], mustbefornat))

        if 'format' in col:
            if col['format'] == 'decimal':
                if col.get('decimal', '.') != '.':
                    return Decimal(val.replace(col['decimal'], '.'))
                else:
                    return Decimal(val)
            elif col['format'] == 'date':
                return datetime.datetime.strptime(val, col['dateformat']).date()
            else:
                raise Exception("Unknown column format {}".format(col['format']))
        else:
            # Just treat it as a string
            return str(val)


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: bankfile.py <definition.json> <import.txt>")
        sys.exit(1)

    with open(sys.argv[1]) as f:
        parser = BankFileParser(json.load(f))

    with open(sys.argv[2], "rb") as f:
        for obj in parser.parse(f):
            print(obj)