FastAPI Background Tasks + Webhooks: Build a Reliable “Do Work Later” Pipeline (Without Celery)
Many web apps need to accept a request quickly, then do slower work afterward: generating PDFs, resizing images, syncing to a third-party API, or sending emails. If you try to do it inline, you’ll blow up latency and timeouts. If you jump straight to a big queue system, you might overcomplicate things.
This article shows a practical middle ground for junior/mid developers: a FastAPI service that (1) returns fast, (2) processes jobs in the background, and (3) notifies other systems via webhooks with retries. You’ll build a small “job queue” using Postgres + a worker loop—no Celery required.
- Accept a job:
POST /jobs→ returns immediately - Process in background: a separate worker runs continuously
- Deliver results: call a customer-provided
webhook_urlwith HMAC signature - Retry on failure: exponential backoff + max attempts
Why not just use BackgroundTasks?
FastAPI’s BackgroundTasks is great for tiny tasks (log an event, fire-and-forget email) but it runs in the same process as your API. If your server restarts, you can lose work. It also doesn’t give you persistence, retries, or visibility.
The pattern below is “production-shaped” while still lightweight: you persist jobs in a database and process them with a worker process you can scale independently.
Project layout
app/ main.py db.py models.py schemas.py worker.py webhook.py requirements.txt
requirements.txt:
fastapi==0.115.0 uvicorn[standard]==0.30.6 sqlalchemy==2.0.32 psycopg[binary]==3.2.1 pydantic==2.8.2 httpx==0.27.2
1) Define a durable job model
We’ll store each job in Postgres with status, attempt counts, and the webhook to call.
# app/models.py import enum from datetime import datetime from sqlalchemy import String, Text, DateTime, Integer, Enum, JSON from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class Base(DeclarativeBase): pass class JobStatus(str, enum.Enum): queued = "queued" processing = "processing" succeeded = "succeeded" failed = "failed" class Job(Base): __tablename__ = "jobs" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) status: Mapped[JobStatus] = mapped_column(Enum(JobStatus), default=JobStatus.queued, index=True) # What work to do (keep it simple: a "type" + JSON payload) job_type: Mapped[str] = mapped_column(String(50), index=True) payload: Mapped[dict] = mapped_column(JSON) # Webhook delivery webhook_url: Mapped[str] = mapped_column(Text) webhook_event: Mapped[str] = mapped_column(String(100), default="job.completed") # Result and errors result: Mapped[dict | None] = mapped_column(JSON, nullable=True) last_error: Mapped[str | None] = mapped_column(Text, nullable=True) # Retry tracking attempts: Mapped[int] = mapped_column(Integer, default=0) max_attempts: Mapped[int] = mapped_column(Integer, default=8) next_run_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, index=True) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
Database setup:
# app/db.py import os from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg://postgres:postgres@localhost:5432/jobsdb") engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Create tables once (in a real app, use migrations):
# create_tables.py from app.db import engine from app.models import Base Base.metadata.create_all(bind=engine) print("Tables created.")
2) Pydantic schemas for safe input/output
# app/schemas.py from pydantic import BaseModel, HttpUrl, Field from typing import Any class JobCreate(BaseModel): job_type: str = Field(min_length=1, max_length=50) payload: dict[str, Any] webhook_url: HttpUrl class JobOut(BaseModel): id: int status: str job_type: str payload: dict[str, Any] result: dict[str, Any] | None = None last_error: str | None = None class Config: from_attributes = True
3) FastAPI endpoints: create and inspect jobs
POST /jobs returns quickly with a job id. The worker handles processing later.
# app/main.py from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.orm import Session from app.db import SessionLocal from app.models import Job, JobStatus from app.schemas import JobCreate, JobOut app = FastAPI(title="Jobs + Webhooks (Lightweight)") def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/jobs", response_model=JobOut, status_code=201) def create_job(data: JobCreate, db: Session = Depends(get_db)): job = Job( job_type=data.job_type, payload=data.payload, webhook_url=str(data.webhook_url), status=JobStatus.queued, ) db.add(job) db.commit() db.refresh(job) return job @app.get("/jobs/{job_id}", response_model=JobOut) def get_job(job_id: int, db: Session = Depends(get_db)): job = db.get(Job, job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job
4) Add webhook signing (HMAC) for trust
If you call someone’s webhook, they should verify it came from you. A common approach is an HMAC signature over the raw body.
# app/webhook.py import os import hmac import hashlib import json import httpx WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "dev_secret_change_me") def sign_payload(payload_bytes: bytes) -> str: digest = hmac.new(WEBHOOK_SECRET.encode("utf-8"), payload_bytes, hashlib.sha256).hexdigest() return f"sha256={digest}" async def deliver_webhook(url: str, event: str, data: dict) -> tuple[bool, str | None]: body = {"event": event, "data": data} raw = json.dumps(body, separators=(",", ":"), sort_keys=True).encode("utf-8") signature = sign_payload(raw) headers = { "Content-Type": "application/json", "X-Webhook-Signature": signature, } try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(url, content=raw, headers=headers) if 200 <= resp.status_code < 300: return True, None return False, f"Non-2xx response: {resp.status_code}" except Exception as e: return False, str(e)
On the receiver side (another service), verification looks like this:
import hmac, hashlib def verify_signature(secret: str, raw_body: bytes, header_value: str) -> bool: expected = hmac.new(secret.encode(), raw_body, hashlib.sha256).hexdigest() return hmac.compare_digest(header_value, f"sha256={expected}")
5) The worker: claim jobs safely and retry with backoff
This worker process polls for runnable jobs, marks one as processing, performs the work, and then tries webhook delivery. If delivery fails, it reschedules with exponential backoff.
Important: we’ll use a database transaction to “claim” a job. In production, you’d use SELECT ... FOR UPDATE SKIP LOCKED to avoid multiple workers taking the same job. SQLAlchemy supports this pattern.
# app/worker.py import asyncio from datetime import datetime, timedelta from sqlalchemy import select from sqlalchemy.orm import Session from app.db import SessionLocal from app.models import Job, JobStatus from app.webhook import deliver_webhook POLL_INTERVAL_SECONDS = 1 def backoff_seconds(attempt: int) -> int: # attempt starts at 1; cap at 5 minutes return min(300, 2 ** min(attempt, 8)) async def process_job(job: Job) -> dict: # Example job types (replace with your real work) if job.job_type == "uppercase": text = str(job.payload.get("text", "")) await asyncio.sleep(0.2) # simulate work return {"upper": text.upper()} if job.job_type == "sum": nums = job.payload.get("numbers", []) total = sum(float(x) for x in nums) return {"sum": total} raise ValueError(f"Unknown job_type: {job.job_type}") def claim_one_job(db: Session) -> Job | None: now = datetime.utcnow() stmt = ( select(Job) .where(Job.status.in_([JobStatus.queued, JobStatus.failed])) .where(Job.next_run_at <= now) .order_by(Job.next_run_at.asc(), Job.id.asc()) .with_for_update(skip_locked=True) .limit(1) ) job = db.execute(stmt).scalars().first() if not job: return None job.status = JobStatus.processing db.add(job) return job async def worker_loop(): while True: db = SessionLocal() try: with db.begin(): job = claim_one_job(db) if not job: await asyncio.sleep(POLL_INTERVAL_SECONDS) continue # Do work + webhook delivery outside the claiming transaction job.attempts += 1 try: result = await process_job(job) job.result = result ok, err = await deliver_webhook( url=job.webhook_url, event=job.webhook_event, data={"job_id": job.id, "status": "succeeded", "result": result}, ) if ok: job.status = JobStatus.succeeded job.last_error = None else: raise RuntimeError(f"Webhook delivery failed: {err}") except Exception as e: job.last_error = str(e) if job.attempts >= job.max_attempts: job.status = JobStatus.failed job.next_run_at = datetime.utcnow() + timedelta(days=3650) # effectively never else: job.status = JobStatus.failed delay = backoff_seconds(job.attempts) job.next_run_at = datetime.utcnow() + timedelta(seconds=delay) db.add(job) db.commit() finally: db.close() if __name__ == "__main__": asyncio.run(worker_loop())
Run it locally
- Start Postgres (Docker or local)
- Create tables:
python create_tables.py - Run API:
uvicorn app.main:app --reload - Run worker:
python -m app.worker
Submit a job:
curl -X POST http://127.0.0.1:8000/jobs \ -H "Content-Type: application/json" \ -d '{ "job_type": "uppercase", "payload": {"text": "hello webhook"}, "webhook_url": "https://webhook.site/your-id" }'
Then check status:
curl http://127.0.0.1:8000/jobs/1
Practical tips to make this “real enough”
- Keep job payloads small: store large blobs (files) in object storage and reference them by URL/key in
payload. - Add idempotency: accept an
Idempotency-Keyheader onPOST /jobsso clients can safely retry without creating duplicate jobs. - Observe everything: log
job_id,attempts, and webhook response codes. A “jobs dashboard” page later becomes trivial. - Scale workers independently: start with one worker; add more replicas if the queue grows. The
SKIP LOCKEDpattern helps avoid double-processing. - Webhook timeouts matter: keep webhook timeouts short (e.g., 5–10s). If recipients need long work, they should accept fast and do their own async processing.
Where to go next
This pattern gets you the reliability basics—durable jobs, retries, and signed webhooks—using tools you already have. When you outgrow it (multiple job priorities, delayed schedules, dead-letter queues, complex fan-out), you can migrate gradually to a dedicated queue system. But you don’t need to start there to ship a solid async pipeline today.
Leave a Reply