FastAPI Background Jobs, the Practical Way: Reliable Tasks with Celery + Redis (Hands-On)

FastAPI Background Jobs, the Practical Way: Reliable Tasks with Celery + Redis (Hands-On)

FastAPI is great at handling HTTP requests fast—but sometimes you shouldn’t do the work inside the request/response cycle. Sending emails, generating reports, resizing images, syncing with third-party APIs, or running expensive database operations can slow your API down and cause timeouts.

In this hands-on guide, you’ll wire FastAPI to a real background job system using Celery and Redis. You’ll build:

  • A FastAPI endpoint that enqueues a job
  • A Celery worker that runs jobs outside the API process
  • A status endpoint to track progress
  • Retry + backoff for flaky tasks

This approach is production-friendly and works well for junior/mid developers because the moving parts are explicit and debuggable.

Why not just use BackgroundTasks?

FastAPI includes BackgroundTasks, but it runs tasks in the same process (after the response). That means:

  • If your API process restarts, the task may be lost
  • Long tasks still consume your API worker resources
  • No built-in retries, scheduling, or queueing features

Celery + Redis gives you durability (queue), isolation (separate worker), retries, and visibility.

Project structure

Create a minimal structure like this:

fastapi-celery-demo/ app/ __init__.py main.py celery_app.py tasks.py requirements.txt 

Install dependencies

Add these to requirements.txt:

fastapi==0.115.0 uvicorn[standard]==0.30.6 celery==5.4.0 redis==5.0.8 pydantic==2.8.2 

Install:

pip install -r requirements.txt

You also need Redis running locally. If you already have Redis installed, start it. Otherwise, run Redis however you normally do in dev (system package, container, or hosted dev instance).

Configure Celery

Create app/celery_app.py:

from celery import Celery # Redis is used both as a broker (queue) and as a result backend (status storage). # In production you might use RabbitMQ as broker and Redis as backend, but this is great for starting out. celery = Celery( "worker", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1", ) celery.conf.update( task_track_started=True, # enables "STARTED" state task_time_limit=60, # hard limit (seconds) - kill runaway tasks task_soft_time_limit=55, # soft limit (seconds) - raise SoftTimeLimitExceeded worker_prefetch_multiplier=1, # better fairness for long tasks ) 

Two Redis DBs are used above (/0 and /1) so queue traffic and result/status data don’t mix. It’s not required, but it’s a common and simple separation.

Write a task with retries and progress updates

Create app/tasks.py:

import time import random from celery.exceptions import Retry from .celery_app import celery @celery.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_jitter=True, max_retries=5) def generate_report(self, user_id: int) -> dict: """ Example job: - Simulates work in steps - Updates progress so the API can show status - Randomly fails sometimes to demonstrate retry/backoff """ total_steps = 5 for step in range(1, total_steps + 1): # Simulate doing something expensive time.sleep(1) # Update progress metadata stored in the backend self.update_state( state="PROGRESS", meta={"current": step, "total": total_steps, "message": f"Step {step}/{total_steps}"}, ) # Random failure demonstration (20% chance) to show retries if random.random() < 0.2: raise Exception("Transient error while generating report") # Return value is stored in result backend when SUCCESS return {"user_id": user_id, "report_url": f"https://example.com/reports/{user_id}.pdf"} 

Key takeaways:

  • bind=True gives you access to self, which lets you call self.update_state(...)
  • autoretry_for + retry_backoff gives you automatic retries with exponential backoff
  • The task returns a JSON-serializable dict for the API to show later

Build the FastAPI app

Create app/main.py:

from fastapi import FastAPI, HTTPException from pydantic import BaseModel from celery.result import AsyncResult from .celery_app import celery from .tasks import generate_report app = FastAPI(title="FastAPI + Celery Demo") class ReportRequest(BaseModel): user_id: int @app.post("/reports") def create_report(payload: ReportRequest): # Enqueue a job. Returns immediately. task = generate_report.delay(payload.user_id) return {"task_id": task.id} @app.get("/reports/{task_id}") def get_report_status(task_id: str): result = AsyncResult(task_id, app=celery) # Celery states: PENDING, STARTED, RETRY, FAILURE, SUCCESS, plus custom like PROGRESS response = { "task_id": task_id, "state": result.state, } # result.info holds meta for PROGRESS, or exception info for FAILURE, or return value for SUCCESS if result.state == "PROGRESS": response["progress"] = result.info elif result.state == "SUCCESS": response["result"] = result.result elif result.state == "FAILURE": # Be careful exposing internal errors in production. Keep it simple for demo. response["error"] = str(result.info) return response 

That’s the whole API layer: enqueue a job, and query a job.

Run it locally

Open three terminals:

  • Terminal 1: Redis
  • Terminal 2: Celery worker
  • Terminal 3: FastAPI

Start the worker (from the project root):

celery -A app.celery_app.celery worker --loglevel=INFO

Start the API:

uvicorn app.main:app --reload

Try it: enqueue a report, then poll status

Enqueue:

curl -X POST http://127.0.0.1:8000/reports \ -H "Content-Type: application/json" \ -d '{"user_id": 123}'

You’ll get:

{"task_id":"...some-id..."}

Poll status:

curl http://127.0.0.1:8000/reports/<TASK_ID>

Expected progression:

  • PENDING → worker hasn’t picked it up yet
  • STARTED / PROGRESS → it’s running and publishing metadata
  • RETRY → it failed but will retry automatically
  • SUCCESS → you’ll see {"report_url": ...}
  • FAILURE → retries exhausted (or a non-retriable exception)

Make jobs idempotent (so retries don’t hurt you)

Retries are great, but they can cause duplicate side effects if you’re not careful. A classic example: a task sends an email, fails after sending, retries, and sends the email again.

Practical rules of thumb:

  • When possible, make tasks idempotent: running twice yields the same final outcome.
  • Store a “job already completed” marker in your database for side-effect steps.
  • Use unique keys (like user_id + report_date) to avoid duplicates.

If your report is generated and uploaded, write the output URL to a DB row keyed by (user_id, report_version). On retry, check if it already exists and return it instead of regenerating.

Common production tweaks (keep it simple, but know what’s next)

  • task_time_limit and task_soft_time_limit prevent stuck jobs.
  • Separate queues for different workloads (emails vs. exports) to avoid starvation.
  • Limit concurrency for expensive tasks so you don’t melt your DB.
  • Don’t expose raw exception strings to clients—map failures to safe messages.

If you want a quick upgrade path for visibility, add Flower (Celery monitoring UI) later. It’s optional, but it makes debugging queues much easier once you have more than a few tasks.

Practical checklist for junior/mid devs shipping this

  • API endpoint enqueues a job and returns task_id
  • Worker runs jobs separately from the API
  • Status endpoint returns state + progress + result
  • Tasks have retries with backoff for transient failures
  • Side effects are protected via idempotency rules

With this setup, your FastAPI app stays responsive, your long-running work becomes reliable, and you gain clear operational behavior (queueing, retries, progress, and results) without overengineering.


Leave a Reply

Your email address will not be published. Required fields are marked *