FastAPI in Practice: Background Jobs + Webhooks + Idempotency (Hands-On)

FastAPI in Practice: Background Jobs + Webhooks + Idempotency (Hands-On)

FastAPI makes it easy to build clean APIs, but many real systems aren’t just “request → response.” You often need to accept a request quickly, do work later (sending emails, generating reports, calling third-party APIs), and notify other systems via webhooks. In this hands-on guide, you’ll build a small, production-shaped FastAPI service that:

  • Accepts a request and returns immediately
  • Runs a background job
  • Delivers a webhook to a subscriber
  • Handles retries safely with idempotency (no double-processing)

The examples are intentionally minimal and “copy/paste runnable” for junior/mid developers, while still teaching patterns you’ll reuse in real apps.

What you’ll build

You’ll create a tiny “invoice processing” API:

  • POST /invoices creates an invoice and schedules processing
  • Processing simulates work (e.g., compute totals, “charge card”), then emits a webhook event invoice.processed
  • A webhook delivery loop retries failures and records results
  • Idempotency prevents duplicate invoices if a client retries the same request

Install and run

Dependencies:

pip install fastapi uvicorn httpx pydantic

Run the server:

uvicorn main:app --reload

This tutorial uses an in-memory “database” (Python dicts) so it’s easy to understand. In production, you’d swap these pieces for PostgreSQL/Redis and a proper queue.

Core concepts (fast, practical)

  • Background jobs: run work after responding. In FastAPI, you can use BackgroundTasks. It’s good for short tasks on the same process, not heavy workloads.
  • Webhooks: your app calls another URL when something happens. You must handle failures and retries.
  • Idempotency keys: a client sends a unique key (often an HTTP header). If they retry due to timeouts, you return the same result instead of creating duplicates.

Project: a single-file app you can run

Create main.py with the following code.

from fastapi import FastAPI, BackgroundTasks, Header, HTTPException from pydantic import BaseModel, Field, AnyUrl from typing import Optional, Dict, Any, List from datetime import datetime, timezone import uuid import asyncio import httpx app = FastAPI(title="Invoices + Webhooks + Idempotency") # --- In-memory stores (swap for DB/Redis in real apps) --- INVOICES: Dict[str, Dict[str, Any]] = {} IDEMPOTENCY: Dict[str, Dict[str, Any]] = {} # key -> {"invoice_id": "...", "response": {...}} WEBHOOK_SUBSCRIBERS: List[Dict[str, Any]] = [] # {"id": "...", "url": "..."} WEBHOOK_DELIVERIES: List[Dict[str, Any]] = [] # delivery logs # --- Models --- class InvoiceIn(BaseModel): customer_id: str = Field(..., min_length=1) amount_cents: int = Field(..., ge=1) currency: str = Field("USD", min_length=3, max_length=3) class InvoiceOut(BaseModel): id: str status: str customer_id: str amount_cents: int currency: str created_at: str class WebhookSubscriptionIn(BaseModel): url: AnyUrl class WebhookSubscriptionOut(BaseModel): id: str url: AnyUrl def now_iso() -> str: return datetime.now(timezone.utc).isoformat() # --- Helpers --- async def deliver_webhook(event_type: str, payload: Dict[str, Any], max_attempts: int = 5) -> None: """ Deliver an event to all subscribers with retry + exponential backoff. Logs results to WEBHOOK_DELIVERIES. """ if not WEBHOOK_SUBSCRIBERS: return async with httpx.AsyncClient(timeout=5.0) as client: for sub in WEBHOOK_SUBSCRIBERS: delivery_id = str(uuid.uuid4()) attempt = 0 backoff = 1.0 while attempt < max_attempts: attempt += 1 try: r = await client.post( str(sub["url"]), json={"type": event_type, "data": payload}, headers={ "User-Agent": "InvoiceService/1.0", "X-Event-Type": event_type, "X-Delivery-ID": delivery_id, "X-Attempt": str(attempt), }, ) if 200 <= r.status_code < 300: WEBHOOK_DELIVERIES.append({ "delivery_id": delivery_id, "subscriber_id": sub["id"], "event_type": event_type, "status": "delivered", "attempt": attempt, "timestamp": now_iso(), "http_status": r.status_code, }) break # Non-2xx: treat as failure (will retry) WEBHOOK_DELIVERIES.append({ "delivery_id": delivery_id, "subscriber_id": sub["id"], "event_type": event_type, "status": "failed", "attempt": attempt, "timestamp": now_iso(), "http_status": r.status_code, }) except Exception as e: WEBHOOK_DELIVERIES.append({ "delivery_id": delivery_id, "subscriber_id": sub["id"], "event_type": event_type, "status": "error", "attempt": attempt, "timestamp": now_iso(), "error": str(e), }) # exponential backoff (1s, 2s, 4s, ...) await asyncio.sleep(backoff) backoff *= 2 async def process_invoice(invoice_id: str) -> None: """ Simulate work: mark invoice as processed, then emit webhook. """ invoice = INVOICES.get(invoice_id) if not invoice: return # Prevent double-processing (basic guard) if invoice["status"] != "created": return # Simulate external work await asyncio.sleep(1.0) invoice["status"] = "processed" invoice["processed_at"] = now_iso() # Emit webhook await deliver_webhook( event_type="invoice.processed", payload={ "invoice_id": invoice_id, "customer_id": invoice["customer_id"], "amount_cents": invoice["amount_cents"], "currency": invoice["currency"], "status": invoice["status"], "processed_at": invoice["processed_at"], }, ) def schedule_processing(background: BackgroundTasks, invoice_id: str) -> None: """ FastAPI BackgroundTasks is sync. We schedule an async task from it. In production, you'd push a job to a queue (Celery/RQ/Sidekiq/etc). """ async def runner(): await process_invoice(invoice_id) # Use asyncio.create_task so it runs after response without blocking. background.add_task(asyncio.create_task, runner()) # --- Routes --- @app.post("/webhooks/subscriptions", response_model=WebhookSubscriptionOut) def create_subscription(body: WebhookSubscriptionIn): sub = {"id": str(uuid.uuid4()), "url": str(body.url)} WEBHOOK_SUBSCRIBERS.append(sub) return sub @app.get("/webhooks/deliveries") def list_deliveries(): # newest first for readability return list(reversed(WEBHOOK_DELIVERIES)) @app.post("/invoices", response_model=InvoiceOut, status_code=201) def create_invoice( body: InvoiceIn, background: BackgroundTasks, idempotency_key: Optional[str] = Header(default=None, convert_underscores=False), ): """ Idempotency: - Client sends Idempotency-Key header. - If we already processed that key, return the same response. - Otherwise create invoice and store mapping. """ if not idempotency_key: raise HTTPException(status_code=400, detail="Missing Idempotency-Key header") existing = IDEMPOTENCY.get(idempotency_key) if existing: # return the same response as before return existing["response"] invoice_id = str(uuid.uuid4()) invoice = { "id": invoice_id, "status": "created", "customer_id": body.customer_id, "amount_cents": body.amount_cents, "currency": body.currency, "created_at": now_iso(), } INVOICES[invoice_id] = invoice response = InvoiceOut(**invoice).model_dump() IDEMPOTENCY[idempotency_key] = { "invoice_id": invoice_id, "response": response, "created_at": now_iso(), } schedule_processing(background, invoice_id) return response @app.get("/invoices/{invoice_id}", response_model=InvoiceOut) def get_invoice(invoice_id: str): invoice = INVOICES.get(invoice_id) if not invoice: raise HTTPException(status_code=404, detail="Invoice not found") return InvoiceOut(**invoice) 

Try it: create a webhook subscriber

To see deliveries, you need a receiver endpoint. The fastest way locally is to run a second tiny FastAPI app as a webhook sink.

Create receiver.py:

from fastapi import FastAPI, Request import random app = FastAPI(title="Webhook Receiver") @app.post("/webhook") async def webhook(req: Request): payload = await req.json() # Simulate occasional failure to test retries if random.random() < 0.3: return {"ok": False, "note": "simulated failure"}, 500 return {"ok": True, "received": payload} 

Run it:

uvicorn receiver:app --reload --port 9000

Now subscribe your main app to the receiver:

curl -X POST http://127.0.0.1:8000/webhooks/subscriptions \ -H "Content-Type: application/json" \ -d '{"url":"http://127.0.0.1:9000/webhook"}'

Try it: create invoices safely with idempotency

Create an invoice with an idempotency key:

curl -X POST http://127.0.0.1:8000/invoices \ -H "Content-Type: application/json" \ -H "Idempotency-Key: inv-001" \ -d '{"customer_id":"cust_123","amount_cents":2500,"currency":"USD"}'

Immediately repeat the same call (same key). You should get the same invoice ID back, not a new invoice:

curl -X POST http://127.0.0.1:8000/invoices \ -H "Content-Type: application/json" \ -H "Idempotency-Key: inv-001" \ -d '{"customer_id":"cust_123","amount_cents":2500,"currency":"USD"}'

Then check the invoice status:

curl http://127.0.0.1:8000/invoices/<PASTE_INVOICE_ID>

Within ~1–2 seconds it will flip from created to processed (background job completed).

Inspect webhook deliveries (and retries)

List delivery logs:

curl http://127.0.0.1:8000/webhooks/deliveries

You’ll see entries like:

  • status: delivered with attempt: 1 when the receiver responded 2xx
  • status: failed or error for non-2xx or network errors
  • Multiple attempts for the same delivery_id if retries were needed

Practical notes for real systems

This demo is intentionally small. Here’s how you harden it when it becomes “real work.”

  • Use a real job queue: BackgroundTasks runs inside your API process. If the process restarts, jobs vanish. For real workloads, enqueue jobs to Redis/RabbitMQ/SQS with Celery/RQ/Arq/etc.
  • Persist idempotency keys: store them in your database with a unique constraint on (key). Consider TTL/expiration.
  • Idempotency scope: decide what the key means. Often it’s “idempotent per endpoint + per customer.” That prevents collisions across users.
  • Sign webhook payloads: include X-Signature (HMAC) so receivers can verify authenticity. (Don’t rely on IP allowlists alone.)
  • Webhook replay and ordering: receivers should treat webhooks as “at least once.” They must dedupe using X-Delivery-ID (or event ID) and not assume strict ordering.
  • Retry strategy: exponential backoff is good; add jitter and a dead-letter queue for repeated failures.

Common mistakes (and how to avoid them)

  • Creating duplicates on client retries: if your mobile client retries on timeout, you’ll create double invoices without idempotency. Always accept Idempotency-Key for “create” endpoints.
  • Assuming webhooks are reliable: they aren’t. Receivers go down. Network blips happen. Build delivery logging + retries.
  • Long tasks inside request handlers: you’ll time out under load. Return quickly and process asynchronously.
  • No observability: always log delivery attempts with IDs so you can answer “Did we send it?” in seconds.

Next steps you can implement today

If you want to extend this tutorial into a portfolio-ready mini-project, add:

  • POST /webhooks/subscriptions with event filters (only send specific event types)
  • HMAC signing (X-Signature) with a per-subscriber secret
  • Database persistence (SQLite/PostgreSQL) with unique constraints for idempotency keys
  • A “dead letter” view: deliveries that failed after max attempts

With these patterns—background work, webhooks, idempotency—you’ll be able to build APIs that behave well under retries, failures, and real-world chaos, not just happy-path demos.


Leave a Reply

Your email address will not be published. Required fields are marked *