shortener/shortener/handlers.py

80 lines
2.6 KiB
Python

import os
import re
from typing import Dict
from urllib.parse import urlparse
from flask import request
from shortener.api_connectors.base_api_connector import BaseApiConnector
from shortener.api_connectors.ovh.ovh_api import OvhApi
from shortener.cache import cache
from shortener.data_structures import UserInput
from shortener.exceptions import BadRequestException, ShortCodeNotFoundException
from shortener.resolver import resolver
SUPPORTED_PROVIDERS: Dict[str, type] = {"ovh": OvhApi}
PROVIDER_NAME = os.environ["PROVIDER"].lower()
PROVIDER_CLASS = SUPPORTED_PROVIDERS[PROVIDER_NAME]
COOLDOWN = "cooldown"
def handle_shorten():
user_input: UserInput = _validate_user_input()
cooldown = cache.get(user_input.short_code)
if cooldown:
raise BadRequestException(
message=f"Please wait before submitting again for {user_input.short_code}."
)
provider: BaseApiConnector = _get_provider()
provider.add_record(user_input)
cache.set(user_input.short_code, COOLDOWN, ex=90)
return {"status": "ok", "url": user_input.short_url}
def _validate_user_input():
user_input = request.json
if not user_input:
user_input = request.form
url = user_input.get("url")
if not url:
raise BadRequestException(message="`url` is required")
parsed_url = urlparse(url)
if not all([parsed_url.scheme, parsed_url.netloc]):
raise BadRequestException(
message="`url` doesn't seem well formatted. "
"Please include at least protocol and domain name. "
"E.g. 'https://google.com' instead of 'google.com'."
)
short_code = user_input.get("short_code")
if not short_code:
raise BadRequestException(message="`short_code` is required")
short_code = str(short_code)
if len(short_code) < 4:
raise BadRequestException(
message="`short_code` must be longer than 4 characters."
)
reg = re.compile(r"^[a-zA-Z0-9_-]+$")
if not reg.match(short_code):
raise BadRequestException(
message="`short_code` must only contain non accentuated letters, "
"digits, dashes (-) and underscores (_)."
)
if _short_code_exists(short_code):
raise BadRequestException(message=f"`{short_code}` is already taken")
user_input = UserInput(short_code=short_code, url=url)
return user_input
def _get_provider() -> BaseApiConnector:
return PROVIDER_CLASS()
def _short_code_exists(short_code):
try:
resolver.resolve_shortcode(short_code)
return True
except ShortCodeNotFoundException:
return False