FastAPI Webhooks in the Real World: Signature Verification, Idempotency, and Safe Async Processing

FastAPI Webhooks in the Real World: Signature Verification, Idempotency, and Safe Async Processing

Webhooks look simple: a vendor sends an HTTP POST to your endpoint and you “do the thing.” In production, it gets messy fast: retries cause duplicates, payloads can be spoofed, and slow processing makes providers time out and retry even more.

This hands-on guide shows a practical FastAPI pattern for webhook handling that junior/mid developers can ship confidently:

  • Verify webhook signatures (HMAC) to block spoofed requests
  • Implement idempotency to avoid processing duplicates
  • Respond quickly (2xx) and process in the background safely
  • Keep a tiny audit trail for debugging

We’ll build a minimal webhook receiver you can adapt to Stripe/GitHub/Shopify/etc. The exact signature scheme differs per provider, but the structure stays the same.

Project setup

Install dependencies:

pip install fastapi uvicorn aiosqlite

Run the app:

uvicorn app:app --reload

We’ll use SQLite for idempotency + audit (simple, zero infra). If your app is multi-instance, use Redis/Postgres instead—same idea.

Webhook contract we’ll support

Our example provider sends:

  • Raw JSON body
  • X-Webhook-Timestamp (Unix seconds)
  • X-Webhook-Signature = HMAC-SHA256 of "{timestamp}.{raw_body}" using a shared secret
  • X-Event-Id unique per event (great for idempotency)

If your provider signs differently (e.g., Stripe uses its own scheme), you’ll adapt the compute_signature and header parsing, but the rest is reusable.

Core FastAPI webhook endpoint (secure + fast)

Create app.py:

import asyncio import hmac import hashlib import json import os import time from typing import Any, Dict, Optional import aiosqlite from fastapi import FastAPI, HTTPException, Request, status app = FastAPI() WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "dev_secret_change_me") DB_PATH = os.environ.get("WEBHOOK_DB", "webhooks.db") # A tiny in-process queue + worker. # For serious workloads or multi-instance deployments, use Redis/Celery/RQ/Kafka. queue: "asyncio.Queue[Dict[str, Any]]" = asyncio.Queue(maxsize=1000) def compute_signature(secret: str, timestamp: str, raw_body: bytes) -> str: """ Provider signs the string: "{timestamp}.{raw_body}" and sends hex-encoded HMAC-SHA256. """ signed_payload = timestamp.encode("utf-8") + b"." + raw_body digest = hmac.new(secret.encode("utf-8"), signed_payload, hashlib.sha256).hexdigest() return digest def constant_time_equals(a: str, b: str) -> bool: # Prevent timing attacks return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8")) async def init_db() -> None: async with aiosqlite.connect(DB_PATH) as db: await db.execute( """ CREATE TABLE IF NOT EXISTS webhook_events ( event_id TEXT PRIMARY KEY, received_at INTEGER NOT NULL, status TEXT NOT NULL, payload_json TEXT NOT NULL, error TEXT ); """ ) await db.commit() @app.on_event("startup") async def startup() -> None: await init_db() asyncio.create_task(worker_loop()) async def worker_loop() -> None: while True: job = await queue.get() try: await process_event(job["event_id"], job["payload"]) await mark_status(job["event_id"], "processed", None) except Exception as e: await mark_status(job["event_id"], "failed", str(e)) finally: queue.task_done() async def mark_status(event_id: str, status_value: str, error: Optional[str]) -> None: async with aiosqlite.connect(DB_PATH) as db: await db.execute( "UPDATE webhook_events SET status = ?, error = ? WHERE event_id = ?", (status_value, error, event_id), ) await db.commit() async def insert_if_new(event_id: str, payload: Dict[str, Any]) -> bool: """ Returns True if inserted (new event). Returns False if already exists (duplicate/retry). """ now = int(time.time()) payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) try: async with aiosqlite.connect(DB_PATH) as db: await db.execute( "INSERT INTO webhook_events(event_id, received_at, status, payload_json) VALUES (?, ?, ?, ?)", (event_id, now, "queued", payload_json), ) await db.commit() return True except aiosqlite.IntegrityError: # event_id already exists return False async def process_event(event_id: str, payload: Dict[str, Any]) -> None: """ Your real business logic goes here. Keep it idempotent: running twice should not double-charge, double-email, etc. """ event_type = payload.get("type") data = payload.get("data", {}) # Example: handle a couple of event types if event_type == "user.created": user_id = data["id"] email = data["email"] # Simulate work await asyncio.sleep(0.2) print(f"[{event_id}] created user {user_id} ({email})") elif event_type == "invoice.paid": invoice_id = data["id"] amount = data["amount"] await asyncio.sleep(0.2) print(f"[{event_id}] invoice paid {invoice_id} amount={amount}") else: # Unknown events should not crash the system. # Store them and treat as processed (or route to a dead-letter queue). print(f"[{event_id}] ignored event type={event_type}") @app.post("/webhooks/provider") async def provider_webhook(request: Request): # 1) Read raw body (signature verification requires exact bytes) raw_body = await request.body() # 2) Get headers timestamp = request.headers.get("X-Webhook-Timestamp") signature = request.headers.get("X-Webhook-Signature") event_id = request.headers.get("X-Event-Id") if not timestamp or not signature or not event_id: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing required headers") # 3) Basic replay protection: reject very old timestamps # Adjust window to your provider's retry behavior (e.g., 5 minutes). try: ts_int = int(timestamp) except ValueError: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid timestamp") if abs(int(time.time()) - ts_int) > 300: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Stale timestamp") # 4) Verify signature (constant-time compare) expected = compute_signature(WEBHOOK_SECRET, timestamp, raw_body) if not constant_time_equals(expected, signature): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid signature") # 5) Parse JSON only after signature passes try: payload = json.loads(raw_body.decode("utf-8")) except json.JSONDecodeError: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON") # 6) Idempotency: record event_id first, before doing any work is_new = await insert_if_new(event_id, payload) if not is_new: # Important: respond 2xx so the provider stops retrying. return {"ok": True, "duplicate": True} # 7) Enqueue for background processing; respond quickly try: queue.put_nowait({"event_id": event_id, "payload": payload}) except asyncio.QueueFull: # If your queue is full, returning 503 may prompt retries. # Alternatively, return 202 but mark as failed and alert. await mark_status(event_id, "failed", "queue_full") raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Busy") return {"ok": True, "queued": True}

This endpoint is production-shaped:

  • It verifies signatures against the raw bytes of the body.
  • It includes a timestamp window to reduce replay attacks.
  • It stores event_id first to dedupe provider retries.
  • It responds fast and processes asynchronously to avoid timeouts.

Try it locally with a “fake provider” request

In another terminal, craft a signed request. (On a real provider you won’t do this manually, but it helps validate your implementation.)

python - <<'PY' import hmac, hashlib, json, time, requests secret = "dev_secret_change_me" timestamp = str(int(time.time())) payload = {"type": "user.created", "data": {"id": "u_123", "email": "[email protected]"}} raw = json.dumps(payload, separators=(",", ":")).encode("utf-8") to_sign = timestamp.encode() + b"." + raw sig = hmac.new(secret.encode(), to_sign, hashlib.sha256).hexdigest() headers = { "X-Webhook-Timestamp": timestamp, "X-Webhook-Signature": sig, "X-Event-Id": "evt_001" } r = requests.post("http://127.0.0.1:8000/webhooks/provider", data=raw, headers=headers) print(r.status_code, r.text) # Send again with the same event id => should be deduped r2 = requests.post("http://127.0.0.1:8000/webhooks/provider", data=raw, headers=headers) print(r2.status_code, r2.text) PY

You should see "queued": true the first time and "duplicate": true the second time. Your app logs will print the processed event once.

Make processing truly safe (the “outbox” mindset)

The in-memory queue works for learning and small internal tools. In production, consider what happens if the server restarts after returning 200 but before the job is processed.

To reduce lost work, use the database record as a source of truth:

  • Insert the event with status="queued" (we already do this)
  • Have a separate worker process poll the DB for queued events
  • Mark processing then processed (or failed)

This is the same spirit as the “transactional outbox” pattern: store the fact that work needs to happen, then perform it reliably.

Here’s a simple polling worker you can run as a separate process (still SQLite, still minimal). Add this to app.py and run it with an env var toggle:

async def poll_db_for_jobs() -> None: while True: async with aiosqlite.connect(DB_PATH) as db: db.row_factory = aiosqlite.Row rows = await db.execute_fetchall( "SELECT event_id, payload_json FROM webhook_events WHERE status = 'queued' ORDER BY received_at LIMIT 50" ) for row in rows: event_id = row["event_id"] payload = json.loads(row["payload_json"]) await db.execute( "UPDATE webhook_events SET status = 'processing' WHERE event_id = ? AND status = 'queued'", (event_id,), ) await db.commit() try: await process_event(event_id, payload) await mark_status(event_id, "processed", None) except Exception as e: await mark_status(event_id, "failed", str(e)) await asyncio.sleep(1.0)

With this approach, even if your API instance restarts, queued events remain in the database and will be picked up.

Operational tips you’ll thank yourself for later

  • Always return 2xx for duplicates. Providers retry when they don’t see success. Your idempotency layer is the safety net.
  • Log with the event id. Include event_id in every log line. It’s your breadcrumb trail.
  • Store raw payloads (carefully). Keeping payload_json helps debugging, but consider PII and retention policies.
  • Make handlers idempotent too. Even with dedupe, you can still see duplicates if the provider changes IDs or you reprocess failed jobs.
  • Use a real queue when scaling. For multiple instances, move from in-memory queue to Redis/Celery/RQ or a message broker.

Common gotchas

  • Reading the body twice. In FastAPI/Starlette, once you await request.body(), reuse that value. Don’t rely on a second read.
  • Signing parsed JSON. Providers sign raw bytes, not your re-serialized JSON. Verify before parsing.
  • Clock drift. Timestamp replay windows require reasonably accurate server time (NTP).
  • Assuming “processed” means “success.” Decide what “processed” means for your business and monitor failure rates.

Wrap-up

A reliable webhook handler is less about fancy frameworks and more about discipline: verify authenticity, dedupe retries, respond fast, and process safely. The FastAPI pattern above gives you a clean baseline you can evolve—from SQLite to Postgres, from an in-process queue to Redis, from print logs to structured observability—without changing the core contract.

If you want, I can also provide a variant that uses Redis for idempotency + a Celery worker (a common production combo) while keeping the same endpoint shape.


Leave a Reply

Your email address will not be published. Required fields are marked *