From 6d2b2b5b06cc58041064dd2feb8387483370f31c Mon Sep 17 00:00:00 2001 From: Gabriel Augendre Date: Sat, 22 Feb 2020 15:25:51 +0100 Subject: [PATCH] Finish refactor into processing classes --- .../bpvf_processor/bpvf_processor.py | 40 +++++++++++ ofx_processor/bpvf_processor/main.py | 65 +---------------- ofx_processor/revolut_processor/main.py | 72 ++----------------- .../revolut_processor/revolut_processor.py | 48 +++++++++++++ ofx_processor/utils/processor.py | 53 ++++++++++++++ tests/test_bpvf_processor.py | 11 ++- tests/test_revolut_processor.py | 23 +++--- 7 files changed, 162 insertions(+), 150 deletions(-) create mode 100644 ofx_processor/bpvf_processor/bpvf_processor.py create mode 100644 ofx_processor/revolut_processor/revolut_processor.py create mode 100644 ofx_processor/utils/processor.py diff --git a/ofx_processor/bpvf_processor/bpvf_processor.py b/ofx_processor/bpvf_processor/bpvf_processor.py new file mode 100644 index 0000000..8670568 --- /dev/null +++ b/ofx_processor/bpvf_processor/bpvf_processor.py @@ -0,0 +1,40 @@ +import re + +from ofx_processor.utils.processor import Processor, Line + + +def _process_name_and_memo(name, memo): + if "CB****" in name: + conversion = re.compile(r"\d+,\d{2}[a-zA-Z]{3}") + match = conversion.search(memo) + if match: + res_name = memo[: match.start() - 1] + res_memo = name + memo[match.start() - 1 :] + else: + res_name = memo + res_memo = name + + return res_name, res_memo + + return name, memo + + +class BpvfLine(Line): + def __init__(self, data=None): + super(BpvfLine, self).__init__(data) + + def get_date(self): + return self.data.dtposted.isoformat().split("T")[0] + + def get_amount(self): + return int(self.data.trnamt * 1000) + + def get_memo(self): + return _process_name_and_memo(self.data.name, self.data.memo)[1] + + def get_payee(self): + return _process_name_and_memo(self.data.name, self.data.memo)[0] + + +class BpvfProcessor(Processor): + line_class = BpvfLine diff --git a/ofx_processor/bpvf_processor/main.py b/ofx_processor/bpvf_processor/main.py index c135e8d..c491b51 100644 --- a/ofx_processor/bpvf_processor/main.py +++ b/ofx_processor/bpvf_processor/main.py @@ -1,10 +1,9 @@ -import re import sys -from collections import defaultdict import click from ofxtools.Parser import OFXTree +from ofx_processor.bpvf_processor.bpvf_processor import BpvfProcessor from ofx_processor.utils import ynab @@ -24,67 +23,9 @@ def cli(ofx_filename): click.secho("Couldn't parse ofx file", fg="red") sys.exit(1) - ynab_transactions = [] - transaction_ids = defaultdict(int) + processor = BpvfProcessor(ofx.statements[0].transactions) + ynab_transactions = processor.get_transactions() - for transaction in ofx.statements[0].transactions: - ynab_transaction = line_to_ynab_transaction(transaction, transaction_ids) - ynab_transactions.append(ynab_transaction) click.secho(f"Processed {len(ynab_transactions)} transactions total.", fg="blue") ynab.push_transactions(ynab_transactions, "bpvf") - - -def _process_name_and_memo(name, memo): - if "CB****" in name: - conversion = re.compile(r"\d+,\d{2}[a-zA-Z]{3}") - match = conversion.search(memo) - if match: - res_name = memo[: match.start() - 1] - res_memo = name + memo[match.start() - 1 :] - else: - res_name = memo - res_memo = name - - return res_name, res_memo - - return name, memo - - -def process_payee(line): - return _process_name_and_memo(line.name, line.memo)[0] - - -def process_memo(line): - return _process_name_and_memo(line.name, line.memo)[1] - - -def line_to_ynab_transaction(line, transaction_ids): - date = process_date(line) - payee = process_payee(line) - memo = process_memo(line) - amount = process_amount(line) - import_id = f"YNAB:{amount}:{date}" - transaction_ids[import_id] += 1 - occurrence = transaction_ids[import_id] - import_id = f"{import_id}:{occurrence}" - ynab_transaction = { - "date": date, - "amount": amount, - "payee_name": payee, - "memo": memo, - "import_id": import_id, - } - return ynab_transaction - - -def process_date(transaction): - return transaction.dtposted.isoformat().split("T")[0] - - -def process_amount(transaction): - return int(transaction.trnamt * 1000) - - -if __name__ == "__main__": - cli() diff --git a/ofx_processor/revolut_processor/main.py b/ofx_processor/revolut_processor/main.py index 6b40b26..4b72db8 100644 --- a/ofx_processor/revolut_processor/main.py +++ b/ofx_processor/revolut_processor/main.py @@ -1,82 +1,18 @@ import csv -from collections import defaultdict import click -import dateparser +from ofx_processor.revolut_processor.revolut_processor import RevolutProcessor from ofx_processor.utils import ynab @click.command(help="Process Revolut bank statement (CSV)") @click.argument("csv_filename") def cli(csv_filename): - ynab_transactions = [] - transaction_ids = defaultdict(int) - with open(csv_filename) as f: reader = csv.DictReader(f, delimiter=";") - for line in reader: - ynab_transaction = line_to_ynab_transaction(line, transaction_ids) - ynab_transactions.append(ynab_transaction) + processor = RevolutProcessor(reader) + ynab_transactions = processor.get_transactions() + click.secho(f"Processed {len(ynab_transactions)} transactions total.", fg="blue") - ynab.push_transactions(ynab_transactions, "revolut") - - -def _amount_str_to_float(amount): - if amount: - return float(amount.replace(",", ".")) - return "" - - -def process_memo(line): - return " - ".join( - filter( - None, - map(str.strip, [line.get("Category", ""), line.get("Exchange Rate", "")]), - ) - ) - - -def process_date(line): - return dateparser.parse(line.get("Completed Date")).strftime("%Y-%m-%d") - - -def _process_inflow(line): - return _amount_str_to_float(line.get("Paid In (EUR)")) - - -def _process_outflow(line): - return _amount_str_to_float(line.get("Paid Out (EUR)")) - - -def process_amount(line): - outflow = _process_outflow(line) - inflow = _process_inflow(line) - amount = -outflow if outflow else inflow - amount = int(amount * 1000) - return amount - - -def line_to_ynab_transaction(line, transaction_ids): - date = process_date(line) - payee = process_payee(line) - memo = process_memo(line) - amount = process_amount(line) - import_id = f"YNAB:{amount}:{date}" - transaction_ids[import_id] += 1 - occurrence = transaction_ids[import_id] - import_id = f"{import_id}:{occurrence}" - ynab_transaction = { - "date": date, - "amount": amount, - "payee_name": payee, - "memo": memo, - "import_id": import_id, - } - return ynab_transaction - - -def process_payee(line): - payee = line["Reference"] - return payee diff --git a/ofx_processor/revolut_processor/revolut_processor.py b/ofx_processor/revolut_processor/revolut_processor.py new file mode 100644 index 0000000..12be251 --- /dev/null +++ b/ofx_processor/revolut_processor/revolut_processor.py @@ -0,0 +1,48 @@ +import dateparser + +from ofx_processor.utils.processor import Processor, Line + + +def _amount_str_to_float(amount): + if amount: + return float(amount.replace(",", ".")) + return "" + + +class RevolutLine(Line): + def __init__(self, data: dict = None): + super(RevolutLine, self).__init__(data) + + def _process_inflow(self): + return _amount_str_to_float(self.data.get("Paid In (EUR)")) + + def _process_outflow(self): + return _amount_str_to_float(self.data.get("Paid Out (EUR)")) + + def get_date(self): + return dateparser.parse(self.data.get("Completed Date")).strftime("%Y-%m-%d") + + def get_amount(self): + outflow = self._process_outflow() + inflow = self._process_inflow() + amount = -outflow if outflow else inflow + amount = int(amount * 1000) + return amount + + def get_memo(self): + return " - ".join( + filter( + None, + map( + str.strip, + [self.data.get("Category", ""), self.data.get("Exchange Rate", "")], + ), + ) + ) + + def get_payee(self): + return self.data.get("Reference") + + +class RevolutProcessor(Processor): + line_class = RevolutLine diff --git a/ofx_processor/utils/processor.py b/ofx_processor/utils/processor.py new file mode 100644 index 0000000..1d1cd06 --- /dev/null +++ b/ofx_processor/utils/processor.py @@ -0,0 +1,53 @@ +from collections import defaultdict + + +class Line: + data = None + + def __init__(self, data=None): + self.data = data + + def get_date(self): + pass + + def get_amount(self): + pass + + def get_memo(self): + pass + + def get_payee(self): + pass + + def to_ynab_transaction(self, transaction_ids): + date = self.get_date() + payee = self.get_payee() + memo = self.get_memo() + amount = self.get_amount() + import_id = f"YNAB:{amount}:{date}" + transaction_ids[import_id] += 1 + occurrence = transaction_ids[import_id] + import_id = f"{import_id}:{occurrence}" + ynab_transaction = { + "date": date, + "amount": amount, + "payee_name": payee, + "memo": memo, + "import_id": import_id, + } + return ynab_transaction + + +class Processor: + transaction_ids = defaultdict(int) + line_class = Line + + def __init__(self, iterable): + self.iterable = iterable + + def get_transactions(self): + return list(map(self._get_transaction, self.iterable)) + + def _get_transaction(self, data): + line = self.line_class(data) + return line.to_ynab_transaction(self.transaction_ids) diff --git a/tests/test_bpvf_processor.py b/tests/test_bpvf_processor.py index 9dd8d53..f7c3e21 100644 --- a/tests/test_bpvf_processor.py +++ b/tests/test_bpvf_processor.py @@ -1,6 +1,6 @@ import unittest -from ofx_processor.bpvf_processor.main import _process_name_and_memo +from ofx_processor.bpvf_processor.bpvf_processor import _process_name_and_memo class MyTestCase(unittest.TestCase): @@ -8,8 +8,7 @@ class MyTestCase(unittest.TestCase): name = "business" memo = "2020-01-17" - result_name, result_memo, edited = _process_name_and_memo(name, memo) - self.assertFalse(edited) + result_name, result_memo = _process_name_and_memo(name, memo) self.assertEqual(result_name, name) self.assertEqual(result_memo, memo) @@ -20,8 +19,7 @@ class MyTestCase(unittest.TestCase): expected_name = "GUY AND SONS FR LYON" expected_memo = "150120 CB****5874 0,90EUR 1 EURO = 1,000000" - result_name, result_memo, edited = _process_name_and_memo(name, memo) - self.assertTrue(edited) + result_name, result_memo = _process_name_and_memo(name, memo) self.assertEqual(result_name, expected_name) self.assertEqual(result_memo, expected_memo) @@ -32,8 +30,7 @@ class MyTestCase(unittest.TestCase): expected_name = "Dott 75PARIS" expected_memo = "150120 CB****5874" - result_name, result_memo, edited = _process_name_and_memo(name, memo) - self.assertTrue(edited) + result_name, result_memo = _process_name_and_memo(name, memo) self.assertEqual(result_name, expected_name) self.assertEqual(result_memo, expected_memo) diff --git a/tests/test_revolut_processor.py b/tests/test_revolut_processor.py index 06bebd7..63750be 100644 --- a/tests/test_revolut_processor.py +++ b/tests/test_revolut_processor.py @@ -1,12 +1,9 @@ import datetime import unittest -from ofx_processor.revolut_processor.main import ( +from ofx_processor.revolut_processor.revolut_processor import ( _amount_str_to_float, - process_memo, - process_date, - _process_inflow, - _process_outflow, + RevolutLine, ) @@ -29,43 +26,43 @@ class RevolutProcessorTestCase(unittest.TestCase): def test_process_memo_with_category_and_rate(self): line = {"Category": "category name", "Exchange Rate": "exchange rate"} expected = "category name - exchange rate" - self.assertEqual(process_memo(line), expected) + self.assertEqual(RevolutLine(line).get_memo(), expected) def test_process_memo_with_only_category(self): line = {"Category": "category name", "Exchange Rate": ""} expected = "category name" - self.assertEqual(process_memo(line), expected) + self.assertEqual(RevolutLine(line).get_memo(), expected) def test_process_memo_with_only_rate(self): line = {"Category": "", "Exchange Rate": "exchange rate"} expected = "exchange rate" - self.assertEqual(process_memo(line), expected) + self.assertEqual(RevolutLine(line).get_memo(), expected) def test_process_memo_with_missing_keys(self): line = {"Category": "category name"} expected = "category name" - self.assertEqual(process_memo(line), expected) + self.assertEqual(RevolutLine(line).get_memo(), expected) def test_process_date(self): line = {"Completed Date": "January 16"} current_year = datetime.date.today().year expected = f"{current_year}-01-16" - self.assertEqual(process_date(line), expected) + self.assertEqual(RevolutLine(line).get_date(), expected) def test_process_date_other_year(self): line = {"Completed Date": "January 16 2019"} expected = f"2019-01-16" - self.assertEqual(process_date(line), expected) + self.assertEqual(RevolutLine(line).get_date(), expected) def test_process_inflow(self): line = {"Paid In (EUR)": "3,42"} expected = 3.42 - self.assertEqual(_process_inflow(line), expected) + self.assertEqual(RevolutLine(line)._process_inflow(), expected) def test_process_outflow(self): line = {"Paid Out (EUR)": "8,42"} expected = 8.42 - self.assertEqual(_process_outflow(line), expected) + self.assertEqual(RevolutLine(line)._process_outflow(), expected) if __name__ == "__main__":