tcl-filtrage/main.py

57 lines
1.5 KiB
Python

import enum
from typing import List, Optional
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.params import Header
from pydantic import BaseModel
app = FastAPI()
class PassageType(enum.Enum):
E = "E"
T = "T"
class Passage(BaseModel):
coursetheorique: str
delaipassage: str
direction: str
gid: int
heurepassage: str
id: int
idtarretdestination: int
last_update_fme: str
ligne: str
type: PassageType
class Passages(BaseModel):
passages: List[Passage]
# Stop id can be optained using
# https://data.grandlyon.com/jeux-de-donnees/points-arret-reseau-transports-commun-lyonnais/donnees
@app.get("/stop/{stop_id}", response_model=Passages)
async def stop(stop_id: int, authorization: Optional[str] = Header(None)):
if authorization is None:
raise HTTPException(status_code=401, detail="Not authenticated")
headers = {"Authorization": authorization}
async with httpx.AsyncClient(headers=headers) as client:
res = await client.get(
"https://download.data.grandlyon.com/ws/rdata/tcl_sytral.tclpassagearret/all.json?maxfeatures=-1"
)
if res.status_code != 200:
raise HTTPException(
status_code=res.status_code,
detail="HTTP error during call to remote API",
)
passages: List[Passage] = []
for passage in res.json().get("values"):
if passage.get("id") == stop_id:
passages.append(passage)
return Passages(passages=passages)