Files
planning2ics/webapp/app.py
T
sylvain 56372af486 Fix téléchargement ICS : encodage RFC 5987 pour noms de fichiers non-ASCII
Le header Content-Disposition avec filename="Là_où_bat_le_cœur.ics"
causait une erreur 500 car les caractères non-ASCII ne sont pas valides
en HTTP. Correction avec filename*=UTF-8''<url-encoded> (RFC 5987).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 17:55:28 +01:00

318 lines
11 KiB
Python

"""
app.py - Backend FastAPI pour planning2ics web app.
"""
import asyncio
import json
import os
import secrets
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from urllib.parse import quote
from fastapi import FastAPI, Depends, File, HTTPException, Request, Response, UploadFile, Cookie
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
import core
# ── Chemins ───────────────────────────────────────────────────────────────────
CONFIG_PATH = Path("/app/config.json")
DATA_DIR = Path("/app/data")
def load_config() -> dict:
if CONFIG_PATH.exists() and CONFIG_PATH.is_file():
with open(CONFIG_PATH) as f:
config = json.load(f)
else:
config = {
"ollama": {"url": "", "cluster_model": "", "local_model": ""},
"site": {"calendar_url": "", "base_url": ""},
"auth": {"session_secret": "changez-cette-cle-secrete-en-production", "users": []},
}
# Les variables d'environnement ont priorité sur config.json
if os.getenv("OLLAMA_URL"):
config["ollama"]["url"] = os.environ["OLLAMA_URL"]
if os.getenv("OLLAMA_CLUSTER_MODEL"):
config["ollama"]["cluster_model"] = os.environ["OLLAMA_CLUSTER_MODEL"]
if os.getenv("OLLAMA_LOCAL_MODEL"):
config["ollama"]["local_model"] = os.environ["OLLAMA_LOCAL_MODEL"]
if os.getenv("SITE_CALENDAR_URL"):
config["site"]["calendar_url"] = os.environ["SITE_CALENDAR_URL"]
if os.getenv("SITE_BASE_URL"):
config["site"]["base_url"] = os.environ["SITE_BASE_URL"]
if os.getenv("AUTH_SESSION_SECRET"):
config["auth"]["session_secret"] = os.environ["AUTH_SESSION_SECRET"]
if os.getenv("AUTH_USERS"):
config["auth"]["users"] = json.loads(os.environ["AUTH_USERS"])
return config
# ── App ───────────────────────────────────────────────────────────────────────
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app):
# Création des répertoires de données au démarrage
for _d in ["cache", "jobs", "uploads"]:
(DATA_DIR / _d).mkdir(parents=True, exist_ok=True)
yield
app = FastAPI(title="planning2ics", docs_url=None, redoc_url=None, lifespan=lifespan)
app.mount("/static", StaticFiles(directory="/app/static"), name="static")
# ── Auth ──────────────────────────────────────────────────────────────────────
sessions: dict[str, str] = {} # token → username
def get_current_user(session: Optional[str] = Cookie(default=None)) -> str:
if not session or session not in sessions:
raise HTTPException(status_code=401, detail="Non authentifié")
return sessions[session]
# ── Pages ─────────────────────────────────────────────────────────────────────
@app.get("/")
async def root():
return FileResponse("/app/static/index.html")
@app.get("/api/health")
async def health():
return {"status": "ok"}
# ── Auth endpoints ────────────────────────────────────────────────────────────
@app.post("/api/auth/login")
async def login(request: Request, response: Response):
data = await request.json()
config = load_config()
username = data.get("username", "")
password = data.get("password", "")
for user in config["auth"]["users"]:
if user["username"] == username and user["password"] == password:
token = secrets.token_hex(32)
sessions[token] = username
response.set_cookie(
key="session", value=token,
httponly=True, samesite="lax", max_age=86400 * 7
)
return {"ok": True, "username": username}
raise HTTPException(status_code=401, detail="Identifiants incorrects")
@app.post("/api/auth/logout")
async def logout(response: Response, session: Optional[str] = Cookie(default=None)):
if session and session in sessions:
del sessions[session]
response.delete_cookie("session")
return {"ok": True}
@app.get("/api/auth/me")
async def me(user: str = Depends(get_current_user)):
return {"username": user}
# ── Config publique ───────────────────────────────────────────────────────────
@app.get("/api/config")
async def public_config(user: str = Depends(get_current_user)):
cfg = load_config()
return {
"ollama_url": cfg["ollama"]["url"],
"cluster_model": cfg["ollama"]["cluster_model"],
"local_model": cfg["ollama"]["local_model"],
}
# ── Traitement PDF ────────────────────────────────────────────────────────────
jobs: dict[str, dict] = {}
@app.post("/api/process")
async def start_processing(
files: list[UploadFile],
user: str = Depends(get_current_user),
):
job_id = str(uuid.uuid4())
queue = asyncio.Queue()
upload_dir = DATA_DIR / "uploads" / job_id
upload_dir.mkdir(parents=True)
saved_paths = []
pdf_names = []
for file in files:
if not file.filename.lower().endswith('.pdf'):
continue
dest = upload_dir / file.filename
dest.write_bytes(await file.read())
saved_paths.append(dest)
pdf_names.append(file.filename)
if not saved_paths:
raise HTTPException(400, "Aucun fichier PDF valide fourni")
jobs[job_id] = {
"status": "running",
"queue": queue,
"result": None,
"created_at": datetime.now().isoformat(),
"pdf_names": pdf_names,
"user": user,
}
asyncio.create_task(_run_processing(job_id, saved_paths, queue))
return {"job_id": job_id}
async def _run_processing(job_id: str, pdf_paths: list, queue: asyncio.Queue):
loop = asyncio.get_running_loop()
config = load_config()
def log(msg: str):
asyncio.run_coroutine_threadsafe(
queue.put({"type": "progress", "message": msg}), loop
)
try:
result = await loop.run_in_executor(
None, lambda: core.process_pdfs(pdf_paths, config, DATA_DIR, log)
)
# Sauvegarder les ICS
output_dir = DATA_DIR / "jobs" / job_id
output_dir.mkdir(parents=True, exist_ok=True)
series_list = []
for series_title, data in result.items():
(output_dir / data['filename']).write_bytes(data['bytes'])
series_list.append({
"name": series_title,
"filename": data['filename'],
"event_count": data['event_count'],
})
meta = {
"job_id": job_id,
"created_at": jobs[job_id]["created_at"],
"pdf_names": jobs[job_id]["pdf_names"],
"series": series_list,
}
(output_dir / "metadata.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2)
)
jobs[job_id]["status"] = "done"
jobs[job_id]["result"] = series_list
await queue.put({"type": "done", "series": series_list})
except Exception as e:
import traceback
traceback.print_exc()
jobs[job_id]["status"] = "error"
await queue.put({"type": "error", "message": str(e)})
@app.get("/api/progress/{job_id}")
async def progress_stream(job_id: str, user: str = Depends(get_current_user)):
if job_id not in jobs:
raise HTTPException(404, "Job introuvable")
async def event_stream():
q = jobs[job_id]["queue"]
while True:
try:
msg = await asyncio.wait_for(q.get(), timeout=30)
yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n"
if msg["type"] in ("done", "error"):
break
except asyncio.TimeoutError:
yield "data: {\"type\":\"ping\"}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
@app.get("/api/jobs")
async def list_jobs(user: str = Depends(get_current_user)):
result = []
jobs_dir = DATA_DIR / "jobs"
if jobs_dir.exists():
for d in sorted(jobs_dir.iterdir(), key=lambda p: p.stat().st_mtime, reverse=True):
meta = d / "metadata.json"
if meta.exists():
result.append(json.loads(meta.read_text()))
return result
@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str, user: str = Depends(get_current_user)):
if job_id in jobs:
j = jobs[job_id]
return {
"job_id": job_id,
"status": j["status"],
"pdf_names": j["pdf_names"],
"series": j.get("result"),
"created_at": j["created_at"],
}
meta = DATA_DIR / "jobs" / job_id / "metadata.json"
if meta.exists():
return json.loads(meta.read_text())
raise HTTPException(404, "Job introuvable")
@app.get("/api/download/{job_id}/{filename}")
async def download_ics(
job_id: str, filename: str, user: str = Depends(get_current_user)
):
# Sécurité : empêcher path traversal
filename = Path(filename).name
ics_path = DATA_DIR / "jobs" / job_id / filename
if not ics_path.exists():
raise HTTPException(404, "Fichier introuvable")
encoded = quote(filename, safe='')
return FileResponse(
ics_path, media_type="text/calendar",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded}"},
)
# ── Cache ─────────────────────────────────────────────────────────────────────
@app.get("/api/cache/status")
async def cache_status(user: str = Depends(get_current_user)):
cache_dir = DATA_DIR / "cache"
return {
"website_cached": (cache_dir / "website_catalog.json").exists(),
"series_cached": (cache_dir / "series_mapping.json").exists(),
}
@app.delete("/api/cache")
async def clear_cache(user: str = Depends(get_current_user)):
cache_dir = DATA_DIR / "cache"
deleted = []
for name in ["website_catalog.json", "series_mapping.json",
"series_clusters.json", "series_site_match.json"]:
p = cache_dir / name
if p.exists():
p.unlink()
deleted.append(name)
return {"deleted": deleted}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)