tcl-filtrage/main.py
2021-11-09 12:33:18 +01:00

46 lines
1.4 KiB
Python

from typing import List, Optional
from fastapi import FastAPI, HTTPException
from fastapi.params import Header
from pydantic import BaseModel
import enum, httpx
app = FastAPI()
class PassageType(enum.Enum):
E = "E"
T = "T"
class Passage(BaseModel):
coursetheorique: str
delaipassage: str
direction: str
gid: int
heurepassage: str
id: int
idtarretdestination: int
last_update_fme: str
ligne: str
type: PassageType
class Passages(BaseModel):
passages: List[Passage]
# Stop id can be optained using
# https://data.grandlyon.com/jeux-de-donnees/points-arret-reseau-transports-commun-lyonnais/donnees
@app.get("/stop/{stop_id}", response_model=Passages)
async def stop(stop_id: int, authorization: Optional[str] = Header(None)):
if authorization is None:
raise HTTPException(status_code=401, detail="Not authenticated")
headers = {'Authorization': authorization}
async with httpx.AsyncClient(headers=headers) as client:
res = await client.get('https://download.data.grandlyon.com/ws/rdata/tcl_sytral.tclpassagearret/all.json?maxfeatures=-1')
if res.status_code != 200:
raise HTTPException(status_code=res.status_code, detail="HTTP error during call to remote API")
passages: List[Passage] = []
for passage in res.json().get("values"):
if passage.get("id") == stop_id:
passages.append(passage)
return Passages(passages=passages)