FastAPI Background Jobs, Done Right: From “Fire-and-Forget” to Reliable Work Queues (Hands-On)
FastAPI makes it easy to build APIs, but real apps quickly need work that shouldn’t run inside the request: sending emails, generating PDFs, resizing images, calling slow third-party APIs, or crunching reports. A common junior mistake is to “just do it after returning” or to use BackgroundTasks for everything—only to discover jobs vanish on deploy, duplicate during retries, or slow down the app under load.
This article shows a practical progression:
- When
BackgroundTasksis OK (and when it isn’t) - How to design an “outbox” table so jobs don’t get lost
- How to run a simple worker loop (no extra infrastructure)
- How to make jobs idempotent and observable
All examples are minimal, but the patterns scale.
1) Level 0: The “just do it” anti-pattern
Imagine an endpoint that creates an order and then emails a receipt.
# ❌ Don't do this (blocks request, fragile) @app.post("/orders") def create_order(payload: OrderIn): order = save_order(payload) # writes to DB send_receipt_email(order.id) # slow I/O, can fail return {"id": order.id}
This blocks the request, adds latency, and makes the request fail if email fails—even though the order is already saved. Worse: clients might retry, creating duplicates.
2) Level 1: FastAPI BackgroundTasks (good for small, in-process tasks)
BackgroundTasks runs after the response is returned, inside the same server process.
from fastapi import FastAPI, BackgroundTasks app = FastAPI() def send_receipt_email(order_id: int) -> None: # call email provider, render template, etc. print(f"Sending receipt for order {order_id}") @app.post("/orders") def create_order(payload: dict, background: BackgroundTasks): order_id = 123 # pretend DB insert background.add_task(send_receipt_email, order_id) return {"id": order_id, "status": "created"}
When this is acceptable:
- Non-critical side effects (best-effort)
- Very fast tasks (milliseconds to a couple seconds)
- You can tolerate losing tasks during restarts
When it is NOT acceptable:
- Any “must happen” job (billing, compliance, receipts, data exports)
- Long-running tasks
- Multi-worker deployments where you need job coordination
Because it’s in-process, if your container restarts right after responding, the background task may never run.
3) Level 2: The Outbox pattern (reliability without external queues)
To make background work reliable, persist “intent to do work” in your database. The API request writes:
- your business data (e.g., the order)
- a job record describing what needs to happen (the outbox)
Then a worker reads pending jobs and executes them. If the app restarts, jobs remain in the DB.
3.1 Create a tiny outbox table
Here’s a simple schema (PostgreSQL or MySQL-friendly). You can run it as a migration later—start with SQL:
-- jobs_outbox CREATE TABLE jobs_outbox ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, job_type VARCHAR(100) NOT NULL, payload_json TEXT NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending|processing|done|failed attempts INT NOT NULL DEFAULT 0, max_attempts INT NOT NULL DEFAULT 5, run_after TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, locked_at TIMESTAMP NULL, last_error TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_jobs_status_run_after ON jobs_outbox(status, run_after);
Key ideas:
status+run_afterlets you schedule retriesattempts/max_attemptslimits retry stormslocked_atprevents multiple workers from doing the same job
3.2 Write the API endpoint to enqueue jobs transactionally
Below is a working example using SQLAlchemy Core for clarity. It uses SQLite for local testing, but the pattern is the same for Postgres/MySQL. (In production, use Postgres/MySQL for concurrency.)
from datetime import datetime import json from fastapi import FastAPI from sqlalchemy import ( create_engine, MetaData, Table, Column, Integer, String, Text, DateTime, select, insert ) app = FastAPI() engine = create_engine("sqlite:///./app.db", future=True) meta = MetaData() jobs = Table( "jobs_outbox", meta, Column("id", Integer, primary_key=True), Column("job_type", String, nullable=False), Column("payload_json", Text, nullable=False), Column("status", String, nullable=False, default="pending"), Column("attempts", Integer, nullable=False, default=0), Column("max_attempts", Integer, nullable=False, default=5), Column("run_after", DateTime, nullable=False, default=datetime.utcnow), Column("locked_at", DateTime, nullable=True), Column("last_error", Text, nullable=True), Column("created_at", DateTime, nullable=False, default=datetime.utcnow), Column("updated_at", DateTime, nullable=False, default=datetime.utcnow), ) meta.create_all(engine) @app.post("/orders") def create_order(payload: dict): # Imagine this saves the order in the same DB transaction. order_id = 123 job_payload = {"order_id": order_id, "to": payload.get("email")} with engine.begin() as conn: conn.execute( insert(jobs).values( job_type="send_receipt_email", payload_json=json.dumps(job_payload), status="pending", run_after=datetime.utcnow(), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) ) return {"id": order_id, "status": "created", "queued": True}
Now your API can return quickly and safely: the job request is persisted.
4) A simple worker loop that claims jobs safely
The worker needs to:
- Fetch a pending job ready to run
- Atomically “lock” it (mark as
processing) so other workers don’t pick it up - Execute the job
- Mark
doneor schedule a retry
For Postgres, you’d typically use SELECT ... FOR UPDATE SKIP LOCKED. To keep this tutorial runnable everywhere, we’ll use a simpler “optimistic lock” update: update the row only if it’s still pending and unlocked.
import time from datetime import datetime, timedelta import json from sqlalchemy import update def send_receipt_email(job_payload: dict) -> None: # Replace with your real email integration order_id = job_payload["order_id"] to = job_payload.get("to") or "[email protected]" print(f"[email] sending receipt: order={order_id} to={to}") def run_worker(poll_seconds: float = 1.0) -> None: print("Worker started. Ctrl+C to stop.") while True: with engine.begin() as conn: # 1) Find one pending job ready to run row = conn.execute( select(jobs.c.id, jobs.c.job_type, jobs.c.payload_json, jobs.c.attempts, jobs.c.max_attempts) .where(jobs.c.status == "pending") .where(jobs.c.run_after <= datetime.utcnow()) .order_by(jobs.c.id.asc()) .limit(1) ).mappings().first() if not row: # nothing to do pass else: job_id = row["id"] # 2) Try to claim it (atomic-ish) claimed = conn.execute( update(jobs) .where(jobs.c.id == job_id) .where(jobs.c.status == "pending") .values( status="processing", locked_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) ).rowcount if claimed != 1: # another worker got it continue # 3) Execute outside claim logic try: payload = json.loads(row["payload_json"]) if row["job_type"] == "send_receipt_email": send_receipt_email(payload) else: raise ValueError(f"Unknown job_type: {row['job_type']}") # 4) Mark done conn.execute( update(jobs) .where(jobs.c.id == job_id) .values(status="done", updated_at=datetime.utcnow()) ) except Exception as e: # 5) Retry with backoff attempts = row["attempts"] + 1 if attempts >= row["max_attempts"]: conn.execute( update(jobs) .where(jobs.c.id == job_id) .values( status="failed", attempts=attempts, last_error=str(e), updated_at=datetime.utcnow(), ) ) else: backoff = min(60, 2 ** attempts) # cap at 60s conn.execute( update(jobs) .where(jobs.c.id == job_id) .values( status="pending", attempts=attempts, last_error=str(e), run_after=datetime.utcnow() + timedelta(seconds=backoff), locked_at=None, updated_at=datetime.utcnow(), ) ) time.sleep(poll_seconds) if __name__ == "__main__": run_worker()
How to run locally:
- Start the API:
uvicorn app:app --reload - In another terminal, start the worker:
python worker.py - POST to
/orderswith anemailfield and watch the worker logs
5) Make jobs idempotent (so retries don’t cause duplicates)
Retries are normal: network hiccups, provider rate limits, timeouts. If your worker runs a job twice, your system must still be correct.
A common technique is an idempotency key. For example, “email receipt for order 123” should only be sent once. You can enforce this with a unique constraint in a log table:
CREATE TABLE job_effects ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, effect_key VARCHAR(200) NOT NULL UNIQUE, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP );
Then in your job handler:
from sqlalchemy import text def send_receipt_email_idempotent(conn, order_id: int, to: str) -> None: effect_key = f"receipt_email:{order_id}" # Try to record the effect. If it already exists, skip sending. try: conn.execute(text("INSERT INTO job_effects(effect_key) VALUES (:k)"), {"k": effect_key}) except Exception: # In Postgres you'd catch UniqueViolation; simplified here: print(f"[email] already sent receipt for order={order_id}, skipping") return print(f"[email] sending receipt ONCE: order={order_id} to={to}")
This way, even if the job runs twice, the email sends once.
6) Add observability juniors actually use
If background jobs are invisible, they become a nightmare. Minimum practical observability:
- Log every job start/end with
job_idandjob_type - Expose a basic admin endpoint to inspect job status
- Track counts of
pending/failed(even if just SQL)
Example inspection endpoint:
from fastapi import HTTPException @app.get("/jobs/{job_id}") def get_job(job_id: int): with engine.connect() as conn: row = conn.execute( select(jobs).where(jobs.c.id == job_id) ).mappings().first() if not row: raise HTTPException(404, "Job not found") return { "id": row["id"], "job_type": row["job_type"], "status": row["status"], "attempts": row["attempts"], "run_after": row["run_after"], "last_error": row["last_error"], }
This alone saves hours during incidents.
7) When to graduate to a real queue (Celery/RQ/Arq/Sidekiq style)
The DB-outbox worker is great when you want reliability without extra infrastructure, but you should move to a dedicated queue when:
- You need high throughput and many workers
- You want delayed jobs, priorities, rate limits, or cron schedules built-in
- You need distributed locking and robust job leasing
- You’re hitting DB contention from polling
The good news: if you designed your jobs as idempotent handlers with clear job_type + payload, migrating to a queue later is mostly wiring.
Key takeaways
BackgroundTasksis convenient, but it’s not a durable queue.- Persist jobs in an outbox table to avoid losing work on restarts.
- Claim jobs safely, retry with backoff, and cap attempts.
- Make handlers idempotent so retries don’t duplicate side effects.
- Add a tiny “jobs status” endpoint so debugging isn’t guesswork.
If you want, I can adapt this into a production-ready variant using Postgres SKIP LOCKED and a clean project layout (API + worker + migrations), while keeping it junior-friendly.
Leave a Reply