Delete file by default and add keep option + regroup decorators

This commit is contained in:
Gabriel Augendre 2020-05-08 18:02:25 +02:00
parent 87b6ce242e
commit b818d256e7
No known key found for this signature in database
GPG key ID: 1E693F4CE4AEE7B4
8 changed files with 111 additions and 58 deletions

View file

@ -1,7 +1,5 @@
import re import re
import click
from ofx_processor.utils.base_ofx import OfxBaseLine, OfxBaseProcessor from ofx_processor.utils.base_ofx import OfxBaseLine, OfxBaseProcessor
@ -32,10 +30,9 @@ class BpvfLine(OfxBaseLine):
class BpvfProcessor(OfxBaseProcessor): class BpvfProcessor(OfxBaseProcessor):
line_class = BpvfLine line_class = BpvfLine
account_name = "bpvf" account_name = "bpvf"
command_name = "bpvf"
@staticmethod
@click.command("bpvf") def main(filename, keep):
@click.argument("ofx_filename")
def main(ofx_filename):
"""Import BPVF bank statement (OFX file).""" """Import BPVF bank statement (OFX file)."""
BpvfProcessor(ofx_filename).push_to_ynab() BpvfProcessor(filename).push_to_ynab(keep)

View file

@ -1,7 +1,5 @@
import re import re
import click
from ofx_processor.utils.base_ofx import OfxBaseProcessor, OfxBaseLine from ofx_processor.utils.base_ofx import OfxBaseProcessor, OfxBaseLine
@ -28,11 +26,10 @@ class CeLine(OfxBaseLine):
class CeProcessor(OfxBaseProcessor): class CeProcessor(OfxBaseProcessor):
account_name = "ce" account_name = "ce"
command_name = "ce"
line_class = CeLine line_class = CeLine
@staticmethod
@click.command("ce") def main(filename, keep):
@click.argument("ofx_filename")
def main(ofx_filename):
"""Import CE bank statement (OFX file).""" """Import CE bank statement (OFX file)."""
CeProcessor(ofx_filename).push_to_ynab() CeProcessor(filename).push_to_ynab(keep)

View file

@ -12,6 +12,7 @@ class LclLine(OfxBaseLine):
class LclProcessor(OfxBaseProcessor): class LclProcessor(OfxBaseProcessor):
line_class = LclLine line_class = LclLine
account_name = "lcl" account_name = "lcl"
command_name = "lcl"
def parse_file(self): def parse_file(self):
# The first line of this file needs to be removed. # The first line of this file needs to be removed.
@ -35,9 +36,7 @@ class LclProcessor(OfxBaseProcessor):
return transactions return transactions
@staticmethod
@click.command("lcl") def main(filename, keep):
@click.argument("ofx_filename")
def main(ofx_filename):
"""Import LCL bank statement (OFX file).""" """Import LCL bank statement (OFX file)."""
LclProcessor(ofx_filename).push_to_ynab() LclProcessor(filename).push_to_ynab(keep)

View file

@ -48,6 +48,7 @@ class RevolutLine(BaseLine):
class RevolutProcessor(BaseProcessor): class RevolutProcessor(BaseProcessor):
line_class = RevolutLine line_class = RevolutLine
account_name = "revolut" account_name = "revolut"
command_name = "revolut"
def parse_file(self): def parse_file(self):
try: try:
@ -58,9 +59,7 @@ class RevolutProcessor(BaseProcessor):
click.secho("File not found", fg="red") click.secho("File not found", fg="red")
sys.exit(1) sys.exit(1)
@staticmethod
@click.command("revolut") def main(filename, keep):
@click.argument("csv_filename")
def main(csv_filename):
"""Import Revolut bank statement (CSV file).""" """Import Revolut bank statement (CSV file)."""
RevolutProcessor(csv_filename).push_to_ynab() RevolutProcessor(filename).push_to_ynab(keep)

View file

@ -1,3 +1,4 @@
import os
from collections import defaultdict from collections import defaultdict
import click import click
@ -54,10 +55,12 @@ class BaseProcessor:
def parse_file(self): def parse_file(self):
return [] # pragma: nocover return [] # pragma: nocover
def push_to_ynab(self): def push_to_ynab(self, keep=True):
transactions = self.get_transactions() transactions = self.get_transactions()
click.secho(f"Processed {len(transactions)} transactions total.", fg="blue") click.secho(f"Processed {len(transactions)} transactions total.", fg="blue")
ynab.push_transactions(transactions, account=self.account_name) ynab.push_transactions(transactions, account=self.account_name)
if not keep:
os.unlink(self.filename)
def get_transactions(self): def get_transactions(self):
return list(map(self._get_transaction, self.iterable)) return list(map(self._get_transaction, self.iterable))

View file

@ -14,8 +14,11 @@ def discover_processors(cli: click.Group):
To be discovered, processors must: To be discovered, processors must:
* Be in the `processors` package. * Be in the `processors` package.
* Declare a <BankName>Processor class * Declare a <BankName>Processor class
* Declare a static main function in this class, * Declare a main function in the module, outside of the class.
which must be a click command The main function must not be a click command, decorators will be added on the fly.
The main function must accept two parameters:
* filename: str, containing the name of the file to process, as passed on the command line
* keep: boolean, whether to keep the file after processing it or not
:param cli: The main CLI to add discovered processors to. :param cli: The main CLI to add discovered processors to.
""" """
@ -29,7 +32,25 @@ def discover_processors(cli: click.Group):
and "Base" not in item and "Base" not in item
): ):
cls = getattr(module, item) cls = getattr(module, item)
cli.add_command(cls.main) assert hasattr(
module, "main"
), "There must be a main function in the processor module."
assert hasattr(
cls, "command_name"
), "You must add a command_name member to your processor class."
# Apply default decorators
method = getattr(module, "main")
method = click.option(
"--keep/--no-keep",
help="Keep the file after processing it.",
default=False,
show_default=True,
)(method)
method = click.argument("filename")(method)
method = click.command(cls.command_name)(method)
cli.add_command(method)
class OrderedGroup(click.Group): class OrderedGroup(click.Group):

View file

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "ofx-processor" name = "ofx-processor"
version = "1.1.1" version = "2.0.0"
description = "Personal ofx processor" description = "Personal ofx processor"
readme = "README.md" readme = "README.md"
license = "GPL-3.0-or-later" license = "GPL-3.0-or-later"

View file

@ -1,15 +1,12 @@
import json import json
import os import os
import shutil
import unittest import unittest
from unittest import mock from unittest import mock
from unittest.mock import call from unittest.mock import call
from click.testing import CliRunner from click.testing import CliRunner
from ofx_processor.processors.bpvf import BpvfProcessor
from ofx_processor.processors.ce import CeProcessor
from ofx_processor.processors.lcl import LclProcessor
from ofx_processor.processors.revolut import RevolutProcessor
from ofx_processor.utils import utils from ofx_processor.utils import utils
from ofx_processor.utils import ynab from ofx_processor.utils import ynab
from ofx_processor.utils.ynab import config from ofx_processor.utils.ynab import config
@ -21,25 +18,19 @@ class UtilsTestCase(unittest.TestCase):
ofx_processor.main.cli because it tests import time stuff. ofx_processor.main.cli because it tests import time stuff.
""" """
@staticmethod def test_discover_processors(self):
def test_discover_processors(): expected_names = ["config", "bpvf", "lcl", "ce", "revolut"]
ce_main = CeProcessor.main
bpvf_main = BpvfProcessor.main
revolut_main = RevolutProcessor.main
lcl_main = LclProcessor.main
runner = CliRunner() runner = CliRunner()
with mock.patch("click.core.Group.add_command") as add_command: with mock.patch("click.core.Group.add_command") as add_command:
from ofx_processor.main import cli from ofx_processor.main import cli
calls = [call(config, name="config")]
runner.invoke(cli, ["--help"]) runner.invoke(cli, ["--help"])
calls = [
call(ce_main),
call(bpvf_main),
call(revolut_main),
call(lcl_main),
call(config, name="config"),
]
add_command.assert_has_calls(calls, any_order=True) add_command.assert_has_calls(calls, any_order=True)
self.assertEqual(
set(map(lambda item: item.args[0].name, add_command.call_args_list)),
set(expected_names),
)
class ConfigTestCase(unittest.TestCase): class ConfigTestCase(unittest.TestCase):
@ -83,7 +74,7 @@ class ConfigTestCase(unittest.TestCase):
with mock.patch("ofx_processor.utils.ynab.DEFAULT_CONFIG_FILENAME", name): with mock.patch("ofx_processor.utils.ynab.DEFAULT_CONFIG_FILENAME", name):
runner = CliRunner() runner = CliRunner()
result = runner.invoke( result = runner.invoke(
self.cli, ["revolut", "tests/samples/revolut.csv"] self.cli, ["revolut", "tests/samples/revolut.csv", "--keep"]
) )
expected_filename = ynab.get_config_file_name() expected_filename = ynab.get_config_file_name()
self.assertIn("Error while parsing config file", result.output) self.assertIn("Error while parsing config file", result.output)
@ -107,7 +98,9 @@ class ConfigTestCase(unittest.TestCase):
def test_missing_config_file(self, makedirs, edit): def test_missing_config_file(self, makedirs, edit):
runner = CliRunner() runner = CliRunner()
with mock.patch("ofx_processor.utils.ynab.open", mock.mock_open()) as mock_file: with mock.patch("ofx_processor.utils.ynab.open", mock.mock_open()) as mock_file:
result = runner.invoke(self.cli, ["revolut", "tests/samples/revolut.csv"]) result = runner.invoke(
self.cli, ["revolut", "tests/samples/revolut.csv", "--keep"]
)
mock_file.assert_called_once() mock_file.assert_called_once()
makedirs.assert_called_once_with(ynab.DEFAULT_CONFIG_DIR, exist_ok=True) makedirs.assert_called_once_with(ynab.DEFAULT_CONFIG_DIR, exist_ok=True)
self.assertIn("Editing config file", result.output) self.assertIn("Editing config file", result.output)
@ -128,6 +121,44 @@ class DataTestCase(unittest.TestCase):
utils.discover_processors(cli) utils.discover_processors(cli)
self.cli = cli self.cli = cli
@mock.patch("requests.post")
def test_file_is_deleted_by_default(self, *args):
runner = CliRunner()
revolut_csv = "tests/samples/revolut.csv"
backup_file = "tests/samples/backup.csv"
# backup the file, other tests will need it
shutil.copyfile(revolut_csv, backup_file)
try:
self.assertTrue(os.path.isfile(revolut_csv))
result = runner.invoke(self.cli, ["revolut", revolut_csv])
self.assertEqual(result.exit_code, 0)
self.assertFalse(os.path.isfile(revolut_csv))
finally:
# restore the file so other tests can rely on it, whatever happens
shutil.copyfile(backup_file, revolut_csv)
os.unlink(backup_file)
@mock.patch("requests.post")
def test_file_is_kept_with_option(self, *args):
runner = CliRunner()
revolut_csv = "tests/samples/revolut.csv"
backup_file = "tests/samples/backup.csv"
# backup the file, other tests will need it
shutil.copyfile(revolut_csv, backup_file)
try:
self.assertTrue(os.path.isfile(revolut_csv))
result = runner.invoke(self.cli, ["revolut", revolut_csv, "--keep"])
self.assertEqual(result.exit_code, 0)
self.assertTrue(os.path.isfile(revolut_csv))
finally:
# restore the file so other tests can rely on it, whatever happens
shutil.copyfile(backup_file, revolut_csv)
os.unlink(backup_file)
@mock.patch("requests.post") @mock.patch("requests.post")
def test_revolut_sends_data_only_created(self, post): def test_revolut_sends_data_only_created(self, post):
post.return_value.json.return_value = { post.return_value.json.return_value = {
@ -162,7 +193,9 @@ class DataTestCase(unittest.TestCase):
} }
runner = CliRunner() runner = CliRunner()
result = runner.invoke(self.cli, ["revolut", "tests/samples/revolut.csv"]) result = runner.invoke(
self.cli, ["revolut", "tests/samples/revolut.csv", "--keep"]
)
self.assertEqual(result.exit_code, 0) self.assertEqual(result.exit_code, 0)
self.assertIn("Processed 9 transactions total.", result.output) self.assertIn("Processed 9 transactions total.", result.output)
@ -204,7 +237,9 @@ class DataTestCase(unittest.TestCase):
} }
runner = CliRunner() runner = CliRunner()
result = runner.invoke(self.cli, ["revolut", "tests/samples/revolut.csv"]) result = runner.invoke(
self.cli, ["revolut", "tests/samples/revolut.csv", "--keep"]
)
self.assertEqual(result.exit_code, 0) self.assertEqual(result.exit_code, 0)
self.assertIn("Processed 9 transactions total.", result.output) self.assertIn("Processed 9 transactions total.", result.output)
@ -231,7 +266,9 @@ class DataTestCase(unittest.TestCase):
} }
runner = CliRunner() runner = CliRunner()
result = runner.invoke(self.cli, ["revolut", "tests/samples/revolut.csv"]) result = runner.invoke(
self.cli, ["revolut", "tests/samples/revolut.csv", "--keep"]
)
self.assertEqual(result.exit_code, 0) self.assertEqual(result.exit_code, 0)
self.assertIn("Processed 9 transactions total.", result.output) self.assertIn("Processed 9 transactions total.", result.output)
@ -247,7 +284,7 @@ class DataTestCase(unittest.TestCase):
expected_url = f"{ynab.BASE_URL}/budgets/<YOUR BUDGET ID>/transactions" expected_url = f"{ynab.BASE_URL}/budgets/<YOUR BUDGET ID>/transactions"
runner = CliRunner() runner = CliRunner()
runner.invoke(self.cli, ["bpvf", "tests/samples/bpvf.ofx"]) runner.invoke(self.cli, ["bpvf", "tests/samples/bpvf.ofx", "--keep"])
post.assert_called_once_with( post.assert_called_once_with(
expected_url, json=expected_data, headers=expected_headers expected_url, json=expected_data, headers=expected_headers
@ -262,7 +299,7 @@ class DataTestCase(unittest.TestCase):
expected_url = f"{ynab.BASE_URL}/budgets/<YOUR BUDGET ID>/transactions" expected_url = f"{ynab.BASE_URL}/budgets/<YOUR BUDGET ID>/transactions"
runner = CliRunner() runner = CliRunner()
runner.invoke(self.cli, ["ce", "tests/samples/ce.ofx"]) runner.invoke(self.cli, ["ce", "tests/samples/ce.ofx", "--keep"])
post.assert_called_once_with( post.assert_called_once_with(
expected_url, json=expected_data, headers=expected_headers expected_url, json=expected_data, headers=expected_headers
@ -277,7 +314,7 @@ class DataTestCase(unittest.TestCase):
expected_url = f"{ynab.BASE_URL}/budgets/<YOUR BUDGET ID>/transactions" expected_url = f"{ynab.BASE_URL}/budgets/<YOUR BUDGET ID>/transactions"
runner = CliRunner() runner = CliRunner()
runner.invoke(self.cli, ["lcl", "tests/samples/lcl.ofx"]) runner.invoke(self.cli, ["lcl", "tests/samples/lcl.ofx", "--keep"])
post.assert_called_once_with( post.assert_called_once_with(
expected_url, json=expected_data, headers=expected_headers expected_url, json=expected_data, headers=expected_headers