forked from gaugendre/ofx-processor
Refactor to expose a common interface and have a single ynab command
This commit is contained in:
parent
95c1ac6b43
commit
743b798222
6 changed files with 136 additions and 164 deletions
|
@ -1,54 +1,16 @@
|
||||||
import os
|
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from xml.etree import ElementTree
|
|
||||||
|
|
||||||
import click
|
import click
|
||||||
from ofxtools.Parser import OFXTree
|
from ofxtools.Parser import OFXTree
|
||||||
from ofxtools.header import make_header
|
|
||||||
|
|
||||||
from ofx_processor.utils import ynab
|
from ofx_processor.utils import ynab
|
||||||
|
|
||||||
|
|
||||||
def _process_name_and_memo(name, memo):
|
@click.command(help="Process BPVF bank statement (OFX)")
|
||||||
if "CB****" in name:
|
|
||||||
conversion = re.compile(r"\d+,\d{2}[a-zA-Z]{3}")
|
|
||||||
match = conversion.search(memo)
|
|
||||||
if match:
|
|
||||||
res_name = memo[: match.start() - 1]
|
|
||||||
res_memo = name + memo[match.start() - 1 :]
|
|
||||||
else:
|
|
||||||
res_name = memo
|
|
||||||
res_memo = name
|
|
||||||
|
|
||||||
return res_name, res_memo, True
|
|
||||||
|
|
||||||
return name, memo, False
|
|
||||||
|
|
||||||
|
|
||||||
def process_name_and_memo(transaction):
|
|
||||||
return _process_name_and_memo(transaction.name, transaction.memo)
|
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
|
||||||
@click.version_option()
|
|
||||||
@click.argument("ofx_filename")
|
@click.argument("ofx_filename")
|
||||||
@click.option(
|
def cli(ofx_filename):
|
||||||
"--ynab/--no-ynab",
|
|
||||||
"push_to_ynab",
|
|
||||||
default=True,
|
|
||||||
help="Push data directly to YNAB.",
|
|
||||||
show_default=True,
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--file/--no-file",
|
|
||||||
"output_file",
|
|
||||||
default=False,
|
|
||||||
help="Write a processed file.",
|
|
||||||
show_default=True,
|
|
||||||
)
|
|
||||||
def cli(ofx_filename, push_to_ynab, output_file):
|
|
||||||
parser = OFXTree()
|
parser = OFXTree()
|
||||||
try:
|
try:
|
||||||
parser.parse(ofx_filename)
|
parser.parse(ofx_filename)
|
||||||
|
@ -66,45 +28,62 @@ def cli(ofx_filename, push_to_ynab, output_file):
|
||||||
transaction_ids = defaultdict(int)
|
transaction_ids = defaultdict(int)
|
||||||
|
|
||||||
for transaction in ofx.statements[0].transactions:
|
for transaction in ofx.statements[0].transactions:
|
||||||
transaction.name, transaction.memo, edited = process_name_and_memo(transaction)
|
ynab_transaction = line_to_ynab_transaction(transaction, transaction_ids)
|
||||||
|
ynab_transactions.append(ynab_transaction)
|
||||||
|
click.secho(f"Processed {len(ynab_transactions)} transactions total.", fg="blue")
|
||||||
|
|
||||||
if edited:
|
ynab.push_transactions(ynab_transactions, "bpvf")
|
||||||
click.secho(
|
|
||||||
"Edited transaction {} ({})".format(
|
|
||||||
transaction.checknum, transaction.name
|
|
||||||
),
|
|
||||||
fg="blue",
|
|
||||||
)
|
|
||||||
|
|
||||||
date = transaction.dtposted.isoformat().split("T")[0]
|
|
||||||
amount = int(transaction.trnamt * 1000)
|
def _process_name_and_memo(name, memo):
|
||||||
|
if "CB****" in name:
|
||||||
|
conversion = re.compile(r"\d+,\d{2}[a-zA-Z]{3}")
|
||||||
|
match = conversion.search(memo)
|
||||||
|
if match:
|
||||||
|
res_name = memo[: match.start() - 1]
|
||||||
|
res_memo = name + memo[match.start() - 1 :]
|
||||||
|
else:
|
||||||
|
res_name = memo
|
||||||
|
res_memo = name
|
||||||
|
|
||||||
|
return res_name, res_memo
|
||||||
|
|
||||||
|
return name, memo
|
||||||
|
|
||||||
|
|
||||||
|
def process_payee(line):
|
||||||
|
return _process_name_and_memo(line.name, line.memo)[0]
|
||||||
|
|
||||||
|
|
||||||
|
def process_memo(line):
|
||||||
|
return _process_name_and_memo(line.name, line.memo)[1]
|
||||||
|
|
||||||
|
|
||||||
|
def line_to_ynab_transaction(line, transaction_ids):
|
||||||
|
date = process_date(line)
|
||||||
|
payee = process_payee(line)
|
||||||
|
memo = process_memo(line)
|
||||||
|
amount = process_amount(line)
|
||||||
import_id = f"YNAB:{amount}:{date}"
|
import_id = f"YNAB:{amount}:{date}"
|
||||||
transaction_ids[import_id] += 1
|
transaction_ids[import_id] += 1
|
||||||
occurrence = transaction_ids[import_id]
|
occurrence = transaction_ids[import_id]
|
||||||
import_id = f"{import_id}:{occurrence}"
|
import_id = f"{import_id}:{occurrence}"
|
||||||
|
ynab_transaction = {
|
||||||
ynab_transactions.append(
|
|
||||||
{
|
|
||||||
"date": date,
|
"date": date,
|
||||||
"amount": amount,
|
"amount": amount,
|
||||||
"payee_name": transaction.name,
|
"payee_name": payee,
|
||||||
"memo": transaction.memo,
|
"memo": memo,
|
||||||
"import_id": import_id,
|
"import_id": import_id,
|
||||||
}
|
}
|
||||||
)
|
return ynab_transaction
|
||||||
click.secho(f"Processed {len(ynab_transactions)} transactions total.", fg="blue")
|
|
||||||
|
|
||||||
if output_file:
|
|
||||||
header = str(make_header(version=102))
|
|
||||||
root = ofx.to_etree()
|
|
||||||
data = ElementTree.tostring(root).decode()
|
|
||||||
processed_file = os.path.join(os.path.dirname(ofx_filename), "processed.ofx")
|
|
||||||
with open(processed_file, "w") as f:
|
|
||||||
f.write(header + data)
|
|
||||||
click.secho("{} written".format(processed_file), fg="green")
|
|
||||||
|
|
||||||
if push_to_ynab and ynab_transactions:
|
def process_date(transaction):
|
||||||
ynab.push_transactions(ynab_transactions, "bpvf")
|
return transaction.dtposted.isoformat().split("T")[0]
|
||||||
|
|
||||||
|
|
||||||
|
def process_amount(transaction):
|
||||||
|
return int(transaction.trnamt * 1000)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
19
ofx_processor/main.py
Normal file
19
ofx_processor/main.py
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
import click
|
||||||
|
|
||||||
|
import ofx_processor.bpvf_processor.main as bpvf
|
||||||
|
import ofx_processor.revolut_processor.main as revolut
|
||||||
|
|
||||||
|
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||||
|
|
||||||
|
|
||||||
|
@click.group(context_settings=CONTEXT_SETTINGS)
|
||||||
|
@click.version_option()
|
||||||
|
def cli():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
cli.add_command(bpvf.cli, name="bpvf")
|
||||||
|
cli.add_command(revolut.cli, name="revolut")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cli()
|
|
@ -1,5 +1,4 @@
|
||||||
import csv
|
import csv
|
||||||
import os
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
@ -8,7 +7,23 @@ import dateparser
|
||||||
from ofx_processor.utils import ynab
|
from ofx_processor.utils import ynab
|
||||||
|
|
||||||
|
|
||||||
def process_amount(amount):
|
@click.command(help="Process Revolut bank statement (CSV)")
|
||||||
|
@click.argument("csv_filename")
|
||||||
|
def cli(csv_filename):
|
||||||
|
ynab_transactions = []
|
||||||
|
transaction_ids = defaultdict(int)
|
||||||
|
|
||||||
|
with open(csv_filename) as f:
|
||||||
|
reader = csv.DictReader(f, delimiter=";")
|
||||||
|
for line in reader:
|
||||||
|
ynab_transaction = line_to_ynab_transaction(line, transaction_ids)
|
||||||
|
ynab_transactions.append(ynab_transaction)
|
||||||
|
click.secho(f"Processed {len(ynab_transactions)} transactions total.", fg="blue")
|
||||||
|
|
||||||
|
ynab.push_transactions(ynab_transactions, "revolut")
|
||||||
|
|
||||||
|
|
||||||
|
def _amount_str_to_float(amount):
|
||||||
if amount:
|
if amount:
|
||||||
return float(amount.replace(",", "."))
|
return float(amount.replace(",", "."))
|
||||||
return ""
|
return ""
|
||||||
|
@ -27,84 +42,41 @@ def process_date(line):
|
||||||
return dateparser.parse(line.get("Completed Date")).strftime("%Y-%m-%d")
|
return dateparser.parse(line.get("Completed Date")).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
|
||||||
def process_inflow(line):
|
def _process_inflow(line):
|
||||||
return process_amount(line.get("Paid In (EUR)"))
|
return _amount_str_to_float(line.get("Paid In (EUR)"))
|
||||||
|
|
||||||
|
|
||||||
def process_outflow(line):
|
def _process_outflow(line):
|
||||||
return process_amount(line.get("Paid Out (EUR)"))
|
return _amount_str_to_float(line.get("Paid Out (EUR)"))
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
def process_amount(line):
|
||||||
@click.version_option()
|
outflow = _process_outflow(line)
|
||||||
@click.argument("csv_filename")
|
inflow = _process_inflow(line)
|
||||||
@click.option(
|
|
||||||
"--ynab/--no-ynab",
|
|
||||||
"push_to_ynab",
|
|
||||||
default=True,
|
|
||||||
help="Push data directly to YNAB.",
|
|
||||||
show_default=True,
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--file/--no-file",
|
|
||||||
"output_file",
|
|
||||||
default=False,
|
|
||||||
help="Write a processed file.",
|
|
||||||
show_default=True,
|
|
||||||
)
|
|
||||||
def cli(csv_filename, push_to_ynab, output_file):
|
|
||||||
formatted_data = []
|
|
||||||
ynab_transactions = []
|
|
||||||
transaction_ids = defaultdict(int)
|
|
||||||
|
|
||||||
with open(csv_filename) as f:
|
|
||||||
reader = csv.DictReader(f, delimiter=";")
|
|
||||||
for line in reader:
|
|
||||||
date = process_date(line)
|
|
||||||
payee = line["Reference"]
|
|
||||||
memo = process_memo(line)
|
|
||||||
outflow = process_outflow(line)
|
|
||||||
inflow = process_inflow(line)
|
|
||||||
formatted_data.append(
|
|
||||||
{
|
|
||||||
"Date": date,
|
|
||||||
"Payee": payee,
|
|
||||||
"Memo": memo,
|
|
||||||
"Outflow": outflow,
|
|
||||||
"Inflow": inflow,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
amount = -outflow if outflow else inflow
|
amount = -outflow if outflow else inflow
|
||||||
amount = int(amount * 1000)
|
amount = int(amount * 1000)
|
||||||
|
return amount
|
||||||
|
|
||||||
|
|
||||||
|
def line_to_ynab_transaction(line, transaction_ids):
|
||||||
|
date = process_date(line)
|
||||||
|
payee = process_payee(line)
|
||||||
|
memo = process_memo(line)
|
||||||
|
amount = process_amount(line)
|
||||||
import_id = f"YNAB:{amount}:{date}"
|
import_id = f"YNAB:{amount}:{date}"
|
||||||
transaction_ids[import_id] += 1
|
transaction_ids[import_id] += 1
|
||||||
occurrence = transaction_ids[import_id]
|
occurrence = transaction_ids[import_id]
|
||||||
import_id = f"{import_id}:{occurrence}"
|
import_id = f"{import_id}:{occurrence}"
|
||||||
ynab_transactions.append(
|
ynab_transaction = {
|
||||||
{
|
|
||||||
"date": date,
|
"date": date,
|
||||||
"amount": amount,
|
"amount": amount,
|
||||||
"payee_name": payee,
|
"payee_name": payee,
|
||||||
"memo": memo,
|
"memo": memo,
|
||||||
"import_id": import_id,
|
"import_id": import_id,
|
||||||
}
|
}
|
||||||
)
|
return ynab_transaction
|
||||||
click.secho(f"Processed {len(ynab_transactions)} transactions total.", fg="blue")
|
|
||||||
|
|
||||||
if output_file and formatted_data:
|
|
||||||
processed_file = os.path.join(os.path.dirname(csv_filename), "processed.csv")
|
|
||||||
with open(processed_file, "w") as f:
|
|
||||||
writer = csv.DictWriter(
|
|
||||||
f, delimiter=",", quotechar='"', fieldnames=formatted_data[0].keys()
|
|
||||||
)
|
|
||||||
writer.writeheader()
|
|
||||||
writer.writerows(formatted_data)
|
|
||||||
|
|
||||||
click.secho("{} written".format(processed_file), fg="green")
|
|
||||||
|
|
||||||
if push_to_ynab and ynab_transactions:
|
|
||||||
ynab.push_transactions(ynab_transactions, "revolut")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
def process_payee(line):
|
||||||
cli()
|
payee = line["Reference"]
|
||||||
|
return payee
|
||||||
|
|
|
@ -22,6 +22,9 @@ def get_default_config():
|
||||||
|
|
||||||
|
|
||||||
def push_transactions(transactions, account):
|
def push_transactions(transactions, account):
|
||||||
|
if not transactions:
|
||||||
|
click.secho("No transaction, nothing to do.", fg="yellow")
|
||||||
|
return
|
||||||
config = configparser.ConfigParser()
|
config = configparser.ConfigParser()
|
||||||
config_file = os.path.join(DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME)
|
config_file = os.path.join(DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME)
|
||||||
if not os.path.isfile(config_file):
|
if not os.path.isfile(config_file):
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "ofx-processor"
|
name = "ofx-processor"
|
||||||
version = "0.4.2"
|
version = "0.5.0"
|
||||||
description = "Personal ofx processor"
|
description = "Personal ofx processor"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
license = "GPL-3.0-or-later"
|
license = "GPL-3.0-or-later"
|
||||||
|
@ -22,8 +22,7 @@ pytest = "^5.2"
|
||||||
black = "^19.10b0"
|
black = "^19.10b0"
|
||||||
|
|
||||||
[tool.poetry.scripts]
|
[tool.poetry.scripts]
|
||||||
process-bpvf = 'ofx_processor.bpvf_processor.main:cli'
|
ynab = 'ofx_processor.main:cli'
|
||||||
process-revolut = 'ofx_processor.revolut_processor.main:cli'
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["poetry>=0.12"]
|
requires = ["poetry>=0.12"]
|
||||||
|
|
|
@ -2,11 +2,11 @@ import datetime
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from ofx_processor.revolut_processor.main import (
|
from ofx_processor.revolut_processor.main import (
|
||||||
process_amount,
|
_amount_str_to_float,
|
||||||
process_memo,
|
process_memo,
|
||||||
process_date,
|
process_date,
|
||||||
process_inflow,
|
_process_inflow,
|
||||||
process_outflow,
|
_process_outflow,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -14,17 +14,17 @@ class RevolutProcessorTestCase(unittest.TestCase):
|
||||||
def test_process_amount_with_one_decimal_place(self):
|
def test_process_amount_with_one_decimal_place(self):
|
||||||
amount = "3,4"
|
amount = "3,4"
|
||||||
expected = 3.4
|
expected = 3.4
|
||||||
self.assertEqual(process_amount(amount), expected)
|
self.assertEqual(_amount_str_to_float(amount), expected)
|
||||||
|
|
||||||
def test_process_amount_with_two_decimal_places(self):
|
def test_process_amount_with_two_decimal_places(self):
|
||||||
amount = "3,41"
|
amount = "3,41"
|
||||||
expected = 3.41
|
expected = 3.41
|
||||||
self.assertEqual(process_amount(amount), expected)
|
self.assertEqual(_amount_str_to_float(amount), expected)
|
||||||
|
|
||||||
def test_process_amount_with_empty_string(self):
|
def test_process_amount_with_empty_string(self):
|
||||||
amount = ""
|
amount = ""
|
||||||
expected = ""
|
expected = ""
|
||||||
self.assertEqual(process_amount(amount), expected)
|
self.assertEqual(_amount_str_to_float(amount), expected)
|
||||||
|
|
||||||
def test_process_memo_with_category_and_rate(self):
|
def test_process_memo_with_category_and_rate(self):
|
||||||
line = {"Category": "category name", "Exchange Rate": "exchange rate"}
|
line = {"Category": "category name", "Exchange Rate": "exchange rate"}
|
||||||
|
@ -60,12 +60,12 @@ class RevolutProcessorTestCase(unittest.TestCase):
|
||||||
def test_process_inflow(self):
|
def test_process_inflow(self):
|
||||||
line = {"Paid In (EUR)": "3,42"}
|
line = {"Paid In (EUR)": "3,42"}
|
||||||
expected = 3.42
|
expected = 3.42
|
||||||
self.assertEqual(process_inflow(line), expected)
|
self.assertEqual(_process_inflow(line), expected)
|
||||||
|
|
||||||
def test_process_outflow(self):
|
def test_process_outflow(self):
|
||||||
line = {"Paid Out (EUR)": "8,42"}
|
line = {"Paid Out (EUR)": "8,42"}
|
||||||
expected = 8.42
|
expected = 8.42
|
||||||
self.assertEqual(process_outflow(line), expected)
|
self.assertEqual(_process_outflow(line), expected)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
Loading…
Reference in a new issue