FastAPI File Uploads That Don’t Hurt: Validation, Streaming Saves, and Background Processing

FastAPI File Uploads That Don’t Hurt: Validation, Streaming Saves, and Background Processing

File uploads look simple until they hit production: users upload huge files, your server memory spikes, filenames collide, requests time out, and you’re left with half-written blobs and no metadata. In this hands-on guide, you’ll build a small, practical FastAPI service that:

  • Accepts multipart uploads safely (UploadFile + size limits)
  • Saves files to disk without reading everything into memory
  • Computes a SHA-256 hash and basic metadata
  • Runs “processing” work in the background (so uploads respond fast)
  • Exposes endpoints to check status and download files

This pattern is common in real apps: uploading invoices, profile images, CSVs, or user-generated documents.

Project Setup

Create a folder and install dependencies:

python -m venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate pip install fastapi uvicorn python-multipart

File structure:

upload_service/ app.py storage/ # created automatically db.json # created automatically

We’ll use a tiny JSON “database” (good for learning). In a real service you’d likely use Postgres + S3, but the API flow stays the same.

The Core App: Upload, Track, Download

Create app.py:

from __future__ import annotations import hashlib import json import os import time import uuid from pathlib import Path from typing import Any, Dict, Optional from fastapi import BackgroundTasks, FastAPI, File, HTTPException, UploadFile from fastapi.responses import FileResponse from pydantic import BaseModel APP_ROOT = Path(__file__).parent STORAGE_DIR = APP_ROOT / "storage" DB_PATH = APP_ROOT / "db.json" # Basic limits (tune for your environment) MAX_BYTES = 10 * 1024 * 1024 # 10 MB ALLOWED_CONTENT_TYPES = {"text/plain", "text/csv", "application/pdf", "image/png", "image/jpeg"} STORAGE_DIR.mkdir(exist_ok=True) app = FastAPI(title="Upload Service", version="1.0") class UploadRecord(BaseModel): id: str original_name: str content_type: str size_bytes: int sha256: Optional[str] = None stored_path: str status: str # "uploaded" | "processing" | "ready" | "failed" created_at: float error: Optional[str] = None def _load_db() -> Dict[str, Any]: if not DB_PATH.exists(): return {"uploads": {}} return json.loads(DB_PATH.read_text(encoding="utf-8")) def _save_db(data: Dict[str, Any]) -> None: DB_PATH.write_text(json.dumps(data, indent=2), encoding="utf-8") def _safe_filename(name: str) -> str: # Minimal filename sanitation: keep basename, replace spaces, drop odd characters base = os.path.basename(name).strip().replace(" ", "_") keep = "".join(ch for ch in base if ch.isalnum() or ch in {"_", "-", "."}) return keep or "upload.bin" def _stream_save_upload(upload: UploadFile, dest_path: Path) -> int: """ Stream from UploadFile to disk in chunks. Returns total bytes written. """ total = 0 chunk_size = 1024 * 1024 # 1MB chunks with dest_path.open("wb") as out: while True: chunk = upload.file.read(chunk_size) if not chunk: break total += len(chunk) if total > MAX_BYTES: raise ValueError(f"File too large (max {MAX_BYTES} bytes)") out.write(chunk) return total def _compute_sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def process_upload(upload_id: str) -> None: """ Background "processing": compute SHA-256 and simulate work. If you do CPU-heavy stuff (PDF parsing, image resizing), consider a task queue. """ db = _load_db() rec = db["uploads"].get(upload_id) if not rec: return rec["status"] = "processing" _save_db(db) try: path = Path(rec["stored_path"]) # Simulate real processing time time.sleep(0.5) rec["sha256"] = _compute_sha256(path) rec["status"] = "ready" rec["error"] = None except Exception as e: rec["status"] = "failed" rec["error"] = str(e) _save_db(db) @app.post("/uploads", response_model=UploadRecord) async def create_upload(background: BackgroundTasks, file: UploadFile = File(...)): if file.content_type not in ALLOWED_CONTENT_TYPES: raise HTTPException( status_code=415, detail=f"Unsupported content type: {file.content_type}", ) upload_id = str(uuid.uuid4()) safe_name = _safe_filename(file.filename or "upload.bin") stored_name = f"{upload_id}__{safe_name}" stored_path = STORAGE_DIR / stored_name try: size_bytes = _stream_save_upload(file, stored_path) except ValueError as e: # Clean up partial file if stored_path.exists(): stored_path.unlink(missing_ok=True) raise HTTPException(status_code=413, detail=str(e)) except Exception as e: if stored_path.exists(): stored_path.unlink(missing_ok=True) raise HTTPException(status_code=500, detail=f"Failed to save upload: {e}") finally: await file.close() record = UploadRecord( id=upload_id, original_name=file.filename or "upload.bin", content_type=file.content_type, size_bytes=size_bytes, sha256=None, stored_path=str(stored_path), status="uploaded", created_at=time.time(), error=None, ) db = _load_db() db["uploads"][upload_id] = record.model_dump() _save_db(db) # Background processing (fast response to client) background.add_task(process_upload, upload_id) return record @app.get("/uploads/{upload_id}", response_model=UploadRecord) async def get_upload(upload_id: str): db = _load_db() rec = db["uploads"].get(upload_id) if not rec: raise HTTPException(status_code=404, detail="Upload not found") return rec @app.get("/uploads/{upload_id}/download") async def download_upload(upload_id: str): db = _load_db() rec = db["uploads"].get(upload_id) if not rec: raise HTTPException(status_code=404, detail="Upload not found") if rec["status"] != "ready": raise HTTPException(status_code=409, detail=f"File not ready (status={rec['status']})") path = Path(rec["stored_path"]) if not path.exists(): raise HTTPException(status_code=410, detail="File missing from storage") # Give the original filename back to the browser filename = _safe_filename(rec["original_name"]) return FileResponse(path, media_type=rec["content_type"], filename=filename)

Run It Locally

Start the server:

uvicorn app:app --reload --port 8000

Try an upload using curl:

# Upload a file curl -F "file=@./somefile.csv" http://localhost:8000/uploads

You’ll get JSON back with an id. Then poll for status:

curl http://localhost:8000/uploads/<ID>

Once status becomes ready, download it:

curl -L -o downloaded.bin http://localhost:8000/uploads/<ID>/download

Why This Approach Works (and Where Juniors Often Slip)

  • Don’t use await file.read() for big files. That reads everything into memory. Here we stream in chunks and enforce MAX_BYTES.

  • Validate content type early. It’s not perfect security (clients can lie), but it’s a useful first line of defense and user feedback.

  • Use generated IDs, not user filenames. Filenames collide, contain weird characters, and can cause path traversal issues if you’re not careful.

  • Split “upload” from “processing”. Users want a quick response. Hashing, parsing, or image resizing can happen after the upload completes.

Making Background Work Safer

FastAPI’s BackgroundTasks is perfect for lightweight work, but keep these realities in mind:

  • If the server restarts, in-process background work is lost. For important processing, use a task queue (Celery/RQ/Arq) and store jobs in Redis.

  • CPU-heavy processing can block workers. If you do heavy image/PDF work, run it in a separate worker process or queue.

  • Always persist state. We save status and error so clients can handle failures cleanly.

If you want a small step up without introducing a full queue, you can offload CPU work to a thread in the background task (still not durable, but reduces blocking):

import asyncio def process_upload(upload_id: str) -> None: # ... load record, mark processing ... try: path = Path(rec["stored_path"]) # Run hashing in a thread to avoid blocking event loop rec["sha256"] = asyncio.run(asyncio.to_thread(_compute_sha256, path)) rec["status"] = "ready" except Exception as e: rec["status"] = "failed" rec["error"] = str(e) _save_db(db)

Note: this example uses asyncio.run inside a sync function, which is okay for a background thread-like context here, but for larger apps you’d restructure processing as async def and use proper workers.

Hardening Checklist for Production

Before shipping, consider these practical upgrades:

  • Store metadata in a real database. JSON files don’t handle concurrent writes well. SQLite is a simple next step; Postgres is typical.

  • Use object storage. Disk works locally. In production, push uploads to S3-compatible storage and store only keys in the DB.

  • Scan uploads if needed. If users upload arbitrary files, integrate malware scanning (policy-dependent).

  • Add auth and quotas. Tie uploads to a user, rate limit, and enforce per-user storage limits.

  • Log with IDs. Include upload_id in logs so you can trace failures quickly.

Next Steps to Extend the Demo

Here are easy, real-world extensions you can build on top of this:

  • Add an endpoint DELETE /uploads/{id} to remove files and records.

  • Generate thumbnails for images after upload (Pillow) and store them as separate artifacts.

  • Parse CSVs in the background and store preview rows for UI display.

  • Return Location headers and use 202 Accepted to emphasize async processing semantics.

With these patterns—streaming saves, strict validation, and decoupled processing—you’ll avoid the most common upload pitfalls and have a solid base for production-grade file handling in FastAPI.


Leave a Reply

Your email address will not be published. Required fields are marked *