ofx-processor/ofx_processor/utils/ynab.py

45 lines
1.3 KiB
Python
Raw Normal View History

2020-02-08 19:07:05 +01:00
import click
import requests
2021-11-20 15:00:43 +01:00
from ofx_processor.utils.config import get_config
2020-02-22 15:34:40 +01:00
2021-11-20 15:00:43 +01:00
BASE_URL = "https://api.youneedabudget.com/v1"
2020-02-22 15:34:40 +01:00
2020-02-08 19:07:05 +01:00
def push_transactions(transactions, account):
if not transactions:
click.secho("No transaction, nothing to do.", fg="yellow")
return
2021-11-20 15:00:43 +01:00
config = get_config(account)
2021-11-20 15:00:43 +01:00
url = f"{BASE_URL}/budgets/{config.budget_id}/transactions"
2020-02-08 19:07:05 +01:00
for transaction in transactions:
2021-11-20 15:00:43 +01:00
transaction["account_id"] = config.account
2021-11-18 18:35:46 +01:00
transaction["cleared"] = "cleared"
2020-02-11 23:58:08 +01:00
2020-02-08 19:07:05 +01:00
data = {"transactions": transactions}
2021-11-20 15:00:43 +01:00
headers = {"Authorization": f"Bearer {config.token}"}
2020-02-11 23:58:08 +01:00
2020-02-08 19:07:05 +01:00
res = requests.post(url, json=data, headers=headers)
2021-11-20 15:00:43 +01:00
if res.status_code >= 400:
click.secho(f"Error pushing transactions: {res.text}", fg="red")
return
2020-02-08 19:07:05 +01:00
data = res.json()["data"]
2020-02-11 23:58:08 +01:00
created = set()
for transaction in data["transactions"]:
2020-02-29 12:27:39 +01:00
matched_id = transaction.get("matched_transaction_id")
2020-02-11 23:58:08 +01:00
if not matched_id or matched_id not in created:
created.add(transaction["id"])
2020-02-22 15:34:40 +01:00
if created:
click.secho(
f"{len(created)} transactions created in YNAB.", fg="green", bold=True
)
2020-02-11 23:58:08 +01:00
duplicates = data["duplicate_import_ids"]
2020-02-08 19:07:05 +01:00
if duplicates:
click.secho(
f"{len(duplicates)} transactions ignored (duplicates).", fg="yellow"
)