FastAPI Background Jobs, Done Right: From “Fire-and-Forget” to Reliable Work Queues (Hands-On)

FastAPI Background Jobs, Done Right: From “Fire-and-Forget” to Reliable Work Queues (Hands-On)

FastAPI makes it easy to build APIs, but real apps quickly need work that shouldn’t run inside the request: sending emails, generating PDFs, resizing images, calling slow third-party APIs, or crunching reports. A common junior mistake is to “just do it after returning” or to use BackgroundTasks for everything—only to discover jobs vanish on deploy, duplicate during retries, or slow down the app under load.

This article shows a practical progression:

  • When BackgroundTasks is OK (and when it isn’t)
  • How to design an “outbox” table so jobs don’t get lost
  • How to run a simple worker loop (no extra infrastructure)
  • How to make jobs idempotent and observable

All examples are minimal, but the patterns scale.

1) Level 0: The “just do it” anti-pattern

Imagine an endpoint that creates an order and then emails a receipt.

# ❌ Don't do this (blocks request, fragile) @app.post("/orders") def create_order(payload: OrderIn): order = save_order(payload) # writes to DB send_receipt_email(order.id) # slow I/O, can fail return {"id": order.id} 

This blocks the request, adds latency, and makes the request fail if email fails—even though the order is already saved. Worse: clients might retry, creating duplicates.

2) Level 1: FastAPI BackgroundTasks (good for small, in-process tasks)

BackgroundTasks runs after the response is returned, inside the same server process.

from fastapi import FastAPI, BackgroundTasks app = FastAPI() def send_receipt_email(order_id: int) -> None: # call email provider, render template, etc. print(f"Sending receipt for order {order_id}") @app.post("/orders") def create_order(payload: dict, background: BackgroundTasks): order_id = 123 # pretend DB insert background.add_task(send_receipt_email, order_id) return {"id": order_id, "status": "created"} 

When this is acceptable:

  • Non-critical side effects (best-effort)
  • Very fast tasks (milliseconds to a couple seconds)
  • You can tolerate losing tasks during restarts

When it is NOT acceptable:

  • Any “must happen” job (billing, compliance, receipts, data exports)
  • Long-running tasks
  • Multi-worker deployments where you need job coordination

Because it’s in-process, if your container restarts right after responding, the background task may never run.

3) Level 2: The Outbox pattern (reliability without external queues)

To make background work reliable, persist “intent to do work” in your database. The API request writes:

  • your business data (e.g., the order)
  • a job record describing what needs to happen (the outbox)

Then a worker reads pending jobs and executes them. If the app restarts, jobs remain in the DB.

3.1 Create a tiny outbox table

Here’s a simple schema (PostgreSQL or MySQL-friendly). You can run it as a migration later—start with SQL:

-- jobs_outbox CREATE TABLE jobs_outbox ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, job_type VARCHAR(100) NOT NULL, payload_json TEXT NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending|processing|done|failed attempts INT NOT NULL DEFAULT 0, max_attempts INT NOT NULL DEFAULT 5, run_after TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, locked_at TIMESTAMP NULL, last_error TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_jobs_status_run_after ON jobs_outbox(status, run_after); 

Key ideas:

  • status + run_after lets you schedule retries
  • attempts/max_attempts limits retry storms
  • locked_at prevents multiple workers from doing the same job

3.2 Write the API endpoint to enqueue jobs transactionally

Below is a working example using SQLAlchemy Core for clarity. It uses SQLite for local testing, but the pattern is the same for Postgres/MySQL. (In production, use Postgres/MySQL for concurrency.)

from datetime import datetime import json from fastapi import FastAPI from sqlalchemy import ( create_engine, MetaData, Table, Column, Integer, String, Text, DateTime, select, insert ) app = FastAPI() engine = create_engine("sqlite:///./app.db", future=True) meta = MetaData() jobs = Table( "jobs_outbox", meta, Column("id", Integer, primary_key=True), Column("job_type", String, nullable=False), Column("payload_json", Text, nullable=False), Column("status", String, nullable=False, default="pending"), Column("attempts", Integer, nullable=False, default=0), Column("max_attempts", Integer, nullable=False, default=5), Column("run_after", DateTime, nullable=False, default=datetime.utcnow), Column("locked_at", DateTime, nullable=True), Column("last_error", Text, nullable=True), Column("created_at", DateTime, nullable=False, default=datetime.utcnow), Column("updated_at", DateTime, nullable=False, default=datetime.utcnow), ) meta.create_all(engine) @app.post("/orders") def create_order(payload: dict): # Imagine this saves the order in the same DB transaction. order_id = 123 job_payload = {"order_id": order_id, "to": payload.get("email")} with engine.begin() as conn: conn.execute( insert(jobs).values( job_type="send_receipt_email", payload_json=json.dumps(job_payload), status="pending", run_after=datetime.utcnow(), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) ) return {"id": order_id, "status": "created", "queued": True} 

Now your API can return quickly and safely: the job request is persisted.

4) A simple worker loop that claims jobs safely

The worker needs to:

  • Fetch a pending job ready to run
  • Atomically “lock” it (mark as processing) so other workers don’t pick it up
  • Execute the job
  • Mark done or schedule a retry

For Postgres, you’d typically use SELECT ... FOR UPDATE SKIP LOCKED. To keep this tutorial runnable everywhere, we’ll use a simpler “optimistic lock” update: update the row only if it’s still pending and unlocked.

import time from datetime import datetime, timedelta import json from sqlalchemy import update def send_receipt_email(job_payload: dict) -> None: # Replace with your real email integration order_id = job_payload["order_id"] to = job_payload.get("to") or "[email protected]" print(f"[email] sending receipt: order={order_id} to={to}") def run_worker(poll_seconds: float = 1.0) -> None: print("Worker started. Ctrl+C to stop.") while True: with engine.begin() as conn: # 1) Find one pending job ready to run row = conn.execute( select(jobs.c.id, jobs.c.job_type, jobs.c.payload_json, jobs.c.attempts, jobs.c.max_attempts) .where(jobs.c.status == "pending") .where(jobs.c.run_after <= datetime.utcnow()) .order_by(jobs.c.id.asc()) .limit(1) ).mappings().first() if not row: # nothing to do pass else: job_id = row["id"] # 2) Try to claim it (atomic-ish) claimed = conn.execute( update(jobs) .where(jobs.c.id == job_id) .where(jobs.c.status == "pending") .values( status="processing", locked_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) ).rowcount if claimed != 1: # another worker got it continue # 3) Execute outside claim logic try: payload = json.loads(row["payload_json"]) if row["job_type"] == "send_receipt_email": send_receipt_email(payload) else: raise ValueError(f"Unknown job_type: {row['job_type']}") # 4) Mark done conn.execute( update(jobs) .where(jobs.c.id == job_id) .values(status="done", updated_at=datetime.utcnow()) ) except Exception as e: # 5) Retry with backoff attempts = row["attempts"] + 1 if attempts >= row["max_attempts"]: conn.execute( update(jobs) .where(jobs.c.id == job_id) .values( status="failed", attempts=attempts, last_error=str(e), updated_at=datetime.utcnow(), ) ) else: backoff = min(60, 2 ** attempts) # cap at 60s conn.execute( update(jobs) .where(jobs.c.id == job_id) .values( status="pending", attempts=attempts, last_error=str(e), run_after=datetime.utcnow() + timedelta(seconds=backoff), locked_at=None, updated_at=datetime.utcnow(), ) ) time.sleep(poll_seconds) if __name__ == "__main__": run_worker() 

How to run locally:

  • Start the API: uvicorn app:app --reload
  • In another terminal, start the worker: python worker.py
  • POST to /orders with an email field and watch the worker logs

5) Make jobs idempotent (so retries don’t cause duplicates)

Retries are normal: network hiccups, provider rate limits, timeouts. If your worker runs a job twice, your system must still be correct.

A common technique is an idempotency key. For example, “email receipt for order 123” should only be sent once. You can enforce this with a unique constraint in a log table:

CREATE TABLE job_effects ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, effect_key VARCHAR(200) NOT NULL UNIQUE, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); 

Then in your job handler:

from sqlalchemy import text def send_receipt_email_idempotent(conn, order_id: int, to: str) -> None: effect_key = f"receipt_email:{order_id}" # Try to record the effect. If it already exists, skip sending. try: conn.execute(text("INSERT INTO job_effects(effect_key) VALUES (:k)"), {"k": effect_key}) except Exception: # In Postgres you'd catch UniqueViolation; simplified here: print(f"[email] already sent receipt for order={order_id}, skipping") return print(f"[email] sending receipt ONCE: order={order_id} to={to}") 

This way, even if the job runs twice, the email sends once.

6) Add observability juniors actually use

If background jobs are invisible, they become a nightmare. Minimum practical observability:

  • Log every job start/end with job_id and job_type
  • Expose a basic admin endpoint to inspect job status
  • Track counts of pending/failed (even if just SQL)

Example inspection endpoint:

from fastapi import HTTPException @app.get("/jobs/{job_id}") def get_job(job_id: int): with engine.connect() as conn: row = conn.execute( select(jobs).where(jobs.c.id == job_id) ).mappings().first() if not row: raise HTTPException(404, "Job not found") return { "id": row["id"], "job_type": row["job_type"], "status": row["status"], "attempts": row["attempts"], "run_after": row["run_after"], "last_error": row["last_error"], } 

This alone saves hours during incidents.

7) When to graduate to a real queue (Celery/RQ/Arq/Sidekiq style)

The DB-outbox worker is great when you want reliability without extra infrastructure, but you should move to a dedicated queue when:

  • You need high throughput and many workers
  • You want delayed jobs, priorities, rate limits, or cron schedules built-in
  • You need distributed locking and robust job leasing
  • You’re hitting DB contention from polling

The good news: if you designed your jobs as idempotent handlers with clear job_type + payload, migrating to a queue later is mostly wiring.

Key takeaways

  • BackgroundTasks is convenient, but it’s not a durable queue.
  • Persist jobs in an outbox table to avoid losing work on restarts.
  • Claim jobs safely, retry with backoff, and cap attempts.
  • Make handlers idempotent so retries don’t duplicate side effects.
  • Add a tiny “jobs status” endpoint so debugging isn’t guesswork.

If you want, I can adapt this into a production-ready variant using Postgres SKIP LOCKED and a clean project layout (API + worker + migrations), while keeping it junior-friendly.


Leave a Reply

Your email address will not be published. Required fields are marked *