FastAPI + Async SQLAlchemy CRUD You Can Actually Ship (Pagination, Filtering, Alembic)

FastAPI + Async SQLAlchemy CRUD You Can Actually Ship (Pagination, Filtering, Alembic)

If you’ve built a FastAPI demo before, you’ve probably hit the same wall: “How do I structure a real CRUD API with a database, migrations, and endpoints that won’t turn into spaghetti?” This guide gives you a practical, copy-pasteable pattern using FastAPI, SQLAlchemy (async), and Alembic. We’ll build a small “tasks” API with:

  • Async DB session dependency
  • Clean separation: models, schemas, repositories, routers
  • Pagination + basic filtering (without adding a heavy library)
  • Database migrations with Alembic
  • Consistent error handling

1) Project layout (small but scalable)

Here’s a structure that stays readable as your app grows:

app/ main.py db.py models.py schemas.py repos/ tasks.py routers/ tasks.py alembic.ini alembic/ env.py versions/ 

2) Install dependencies

We’ll use Postgres in production, but this works with SQLite too (with a different driver). Install:

pip install fastapi uvicorn[standard] sqlalchemy asyncpg alembic pydantic

If you want SQLite for local dev, install aiosqlite and adjust the DB URL.

3) Database setup with Async SQLAlchemy

Create app/db.py. This file owns your DB engine and the request-scoped session dependency.

# app/db.py import os from typing import AsyncGenerator from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/postgres", ) engine = create_async_engine( DATABASE_URL, echo=False, # set True during debugging pool_pre_ping=True, # helps handle stale connections ) SessionLocal = async_sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False, ) async def get_session() -> AsyncGenerator[AsyncSession, None]: async with SessionLocal() as session: yield session

Why this matters: every request gets its own AsyncSession, and FastAPI handles the lifetime via dependency injection.

4) Define a model

Create a simple Task model in app/models.py.

# app/models.py from datetime import datetime from sqlalchemy import Boolean, DateTime, Integer, String, Text, func from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class Base(DeclarativeBase): pass class Task(Base): __tablename__ = "tasks" id: Mapped[int] = mapped_column(Integer, primary_key=True) title: Mapped[str] = mapped_column(String(200), nullable=False) description: Mapped[str | None] = mapped_column(Text, nullable=True) done: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False, )

5) Pydantic schemas for input/output

Define request/response models in app/schemas.py. This avoids leaking DB internals into your API.

# app/schemas.py from datetime import datetime from pydantic import BaseModel, Field class TaskCreate(BaseModel): title: str = Field(min_length=1, max_length=200) description: str | None = None class TaskUpdate(BaseModel): title: str | None = Field(default=None, min_length=1, max_length=200) description: str | None = None done: bool | None = None class TaskOut(BaseModel): id: int title: str description: str | None done: bool created_at: datetime class Config: from_attributes = True class Page(BaseModel): items: list[TaskOut] total: int limit: int offset: int

6) Repository layer: keep DB code out of routers

Create app/repos/tasks.py. This layer makes your endpoints thin and testable.

# app/repos/tasks.py from sqlalchemy import select, func, update, delete from sqlalchemy.ext.asyncio import AsyncSession from app.models import Task from app.schemas import TaskCreate, TaskUpdate async def create_task(session: AsyncSession, data: TaskCreate) -> Task: task = Task(title=data.title, description=data.description) session.add(task) await session.commit() await session.refresh(task) return task async def get_task(session: AsyncSession, task_id: int) -> Task | None: result = await session.execute(select(Task).where(Task.id == task_id)) return result.scalar_one_or_none() async def list_tasks( session: AsyncSession, *, limit: int, offset: int, q: str | None = None, done: bool | None = None, ) -> tuple[list[Task], int]: stmt = select(Task) count_stmt = select(func.count(Task.id)) if q: like = f"%{q}%" stmt = stmt.where(Task.title.ilike(like)) count_stmt = count_stmt.where(Task.title.ilike(like)) if done is not None: stmt = stmt.where(Task.done == done) count_stmt = count_stmt.where(Task.done == done) stmt = stmt.order_by(Task.id.desc()).limit(limit).offset(offset) items_res = await session.execute(stmt) items = items_res.scalars().all() total_res = await session.execute(count_stmt) total = int(total_res.scalar_one()) return items, total async def update_task(session: AsyncSession, task: Task, data: TaskUpdate) -> Task: # Apply partial updates safely if data.title is not None: task.title = data.title if data.description is not None: task.description = data.description if data.done is not None: task.done = data.done await session.commit() await session.refresh(task) return task async def delete_task(session: AsyncSession, task: Task) -> None: await session.delete(task) await session.commit()

Note: You’ll see people write “generic CRUD” helpers. They’re fine, but for juniors/mids it’s often clearer to write explicit functions per entity at first.

7) Routers: clean endpoints + predictable errors

Create app/routers/tasks.py.

# app/routers/tasks.py from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.ext.asyncio import AsyncSession from app.db import get_session from app.schemas import TaskCreate, TaskUpdate, TaskOut, Page from app.repos import tasks as repo router = APIRouter(prefix="/tasks", tags=["tasks"]) @router.post("", response_model=TaskOut, status_code=status.HTTP_201_CREATED) async def create_task( payload: TaskCreate, session: AsyncSession = Depends(get_session), ): return await repo.create_task(session, payload) @router.get("/{task_id}", response_model=TaskOut) async def read_task( task_id: int, session: AsyncSession = Depends(get_session), ): task = await repo.get_task(session, task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return task @router.get("", response_model=Page) async def read_tasks( limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0), q: str | None = Query(None, min_length=1, max_length=100), done: bool | None = None, session: AsyncSession = Depends(get_session), ): items, total = await repo.list_tasks(session, limit=limit, offset=offset, q=q, done=done) return { "items": items, "total": total, "limit": limit, "offset": offset, } @router.patch("/{task_id}", response_model=TaskOut) async def patch_task( task_id: int, payload: TaskUpdate, session: AsyncSession = Depends(get_session), ): task = await repo.get_task(session, task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return await repo.update_task(session, task, payload) @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def remove_task( task_id: int, session: AsyncSession = Depends(get_session), ): task = await repo.get_task(session, task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") await repo.delete_task(session, task) return None

This gives you a simple, consistent API:

  • POST /tasks
  • GET /tasks?limit=20&offset=0&q=foo&done=false
  • GET /tasks/123
  • PATCH /tasks/123
  • DELETE /tasks/123

8) Wire it up in FastAPI

Create app/main.py.

# app/main.py from fastapi import FastAPI from app.routers.tasks import router as tasks_router app = FastAPI(title="Tasks API") app.include_router(tasks_router) @app.get("/health") async def health(): return {"status": "ok"}

Run it:

uvicorn app.main:app --reload

9) Add migrations with Alembic

Initialize Alembic (run once in your project root):

alembic init alembic

Then configure Alembic to “see” your models. In alembic/env.py, set the metadata and database URL. The key parts look like:

# alembic/env.py (important bits) import os from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config from alembic import context from app.models import Base # <-- import your Base config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name) target_metadata = Base.metadata def get_url() -> str: return os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/postgres") def run_migrations_online() -> None: configuration = config.get_section(config.config_ini_section) or {} configuration["sqlalchemy.url"] = get_url() connectable = async_engine_from_config( configuration, prefix="sqlalchemy.", poolclass=pool.NullPool, ) async def do_run_migrations(connection: Connection) -> None: context.configure(connection=connection, target_metadata=target_metadata, compare_type=True) with context.begin_transaction(): context.run_migrations() async def run() -> None: async with connectable.connect() as connection: await connection.run_sync(lambda conn: context.configure( connection=conn, target_metadata=target_metadata, compare_type=True )) await connection.run_sync(lambda _: context.run_migrations()) import asyncio asyncio.run(run())

Now generate and apply your first migration:

alembic revision --autogenerate -m "create tasks table" alembic upgrade head

Tip: If autogenerate produces unexpected diffs, it usually means Alembic can’t find your models/metadata or your DB URL isn’t what you think it is.

10) Try it with curl (real requests)

# Create a task curl -X POST http://127.0.0.1:8000/tasks \ -H "Content-Type: application/json" \ -d '{"title":"Ship FastAPI CRUD","description":"Add async SQLAlchemy + migrations"}' # List tasks (paginated) curl "http://127.0.0.1:8000/tasks?limit=10&offset=0" # Filter by query string curl "http://127.0.0.1:8000/tasks?q=Ship" # Mark done curl -X PATCH http://127.0.0.1:8000/tasks/1 \ -H "Content-Type: application/json" \ -d '{"done": true}' # Delete curl -X DELETE http://127.0.0.1:8000/tasks/1 -i

Common pitfalls (and how to avoid them)

  • Blocking DB drivers: If you use Postgres, prefer asyncpg with postgresql+asyncpg. Mixing sync drivers into async endpoints can degrade performance.

  • Session leaks: Always create sessions via a dependency (like get_session) so lifecycle is managed per-request.

  • Pagination without totals: Returning only items is easy, but adding total makes frontend pagination much nicer. It’s worth the extra count query for most CRUD screens.

  • Over-validating early: Your TaskCreate/TaskUpdate schemas are the right place to enforce obvious constraints (length, required fields). Keep complex business rules in a service layer if/when you need them.

Where to go next

Once you’re comfortable with this baseline, you can incrementally add:

  • Structured logging and request IDs for debugging production issues.

  • Background jobs (e.g., send emails) using a queue like RQ/Celery, or simple BackgroundTasks for lightweight tasks.

  • Testing with a temporary database (or a transaction rollback strategy) so CI runs fast and reliably.

This pattern—async session dependency, repositories, routers, and migrations—covers the core of most “real” FastAPI CRUD services. It’s simple enough for junior devs to follow, but structured enough that you won’t hate it six months later.


Leave a Reply

Your email address will not be published. Required fields are marked *