FastAPI + Async SQLAlchemy 2.0: Build a Real CRUD API (with Alembic Migrations)

FastAPI + Async SQLAlchemy 2.0: Build a Real CRUD API (with Alembic Migrations)

FastAPI is great for shipping APIs quickly, but many junior/mid developers get stuck when they try to combine it with a “real” database layer: async I/O, connection pooling, sessions, and schema migrations. In this hands-on guide, you’ll build a small but production-shaped CRUD API using FastAPI, SQLAlchemy 2.0 (async), and Alembic for migrations.

We’ll implement:

  • Async DB engine + per-request session
  • A simple Task model and Pydantic schemas
  • CRUD endpoints with safe pagination
  • Alembic migrations so your schema doesn’t drift

1) Project setup

Install dependencies:

pip install fastapi uvicorn[standard] sqlalchemy aiosqlite alembic pydantic

This tutorial uses SQLite for simplicity (aiosqlite), but the patterns are the same for Postgres (asyncpg) or MySQL drivers.

Suggested structure:

app/ __init__.py main.py db.py models.py schemas.py crud.py deps.py alembic.ini alembic/ env.py versions/ 

2) Database engine and async session

Create app/db.py. The key idea: create one async engine for the app, and create a new session per request.

# app/db.py from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import declarative_base DATABASE_URL = "sqlite+aiosqlite:///./dev.db" engine = create_async_engine( DATABASE_URL, echo=False, # set True to debug SQL future=True, ) AsyncSessionLocal = async_sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False, ) Base = declarative_base()

expire_on_commit=False is handy in APIs because objects won’t “go stale” right after commit when you serialize them.

3) Define a model

Create app/models.py:

# app/models.py from sqlalchemy import Boolean, DateTime, Integer, String, func from sqlalchemy.orm import Mapped, mapped_column from .db import Base class Task(Base): __tablename__ = "tasks" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) title: Mapped[str] = mapped_column(String(200), nullable=False) done: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) created_at: Mapped[str] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), )

This is intentionally minimal: an ID, a title, a boolean, and a timestamp.

4) Pydantic schemas (request/response contracts)

Create app/schemas.py. Separate “create/update” schemas from “read” schemas.

# app/schemas.py from pydantic import BaseModel, Field class TaskCreate(BaseModel): title: str = Field(min_length=1, max_length=200) class TaskUpdate(BaseModel): title: str | None = Field(default=None, min_length=1, max_length=200) done: bool | None = None class TaskRead(BaseModel): id: int title: str done: bool class Config: from_attributes = True # SQLAlchemy -> Pydantic

from_attributes=True tells Pydantic it can read from ORM objects.

5) Dependency: one session per request

Create app/deps.py:

# app/deps.py from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession from .db import AsyncSessionLocal async def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: yield session

FastAPI will inject this dependency into your routes, and each request gets a clean session.

6) CRUD functions (keep routes thin)

Create app/crud.py. This is where you’ll talk to SQLAlchemy and return models.

# app/crud.py from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from .models import Task from .schemas import TaskCreate, TaskUpdate async def create_task(db: AsyncSession, data: TaskCreate) -> Task: task = Task(title=data.title, done=False) db.add(task) await db.commit() await db.refresh(task) return task async def get_task(db: AsyncSession, task_id: int) -> Task | None: result = await db.execute(select(Task).where(Task.id == task_id)) return result.scalar_one_or_none() async def list_tasks(db: AsyncSession, limit: int = 20, offset: int = 0) -> list[Task]: limit = min(max(limit, 1), 100) # clamp to 1..100 offset = max(offset, 0) result = await db.execute( select(Task) .order_by(Task.id.desc()) .limit(limit) .offset(offset) ) return list(result.scalars().all()) async def update_task(db: AsyncSession, task: Task, data: TaskUpdate) -> Task: if data.title is not None: task.title = data.title if data.done is not None: task.done = data.done await db.commit() await db.refresh(task) return task async def delete_task(db: AsyncSession, task: Task) -> None: await db.delete(task) await db.commit()

Notice the small but important details:

  • await db.refresh(task) after commit ensures fields like id are available.
  • Pagination clamps values so clients can’t request 50,000 rows by accident.
  • CRUD is isolated from HTTP concerns (status codes, JSON, etc.).

7) FastAPI routes

Create app/main.py:

# app/main.py from fastapi import Depends, FastAPI, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from .deps import get_db from .schemas import TaskCreate, TaskRead, TaskUpdate from . import crud app = FastAPI(title="Tasks API") @app.post("/tasks", response_model=TaskRead, status_code=status.HTTP_201_CREATED) async def create_task(payload: TaskCreate, db: AsyncSession = Depends(get_db)): task = await crud.create_task(db, payload) return task @app.get("/tasks", response_model=list[TaskRead]) async def list_tasks(limit: int = 20, offset: int = 0, db: AsyncSession = Depends(get_db)): return await crud.list_tasks(db, limit=limit, offset=offset) @app.get("/tasks/{task_id}", response_model=TaskRead) async def get_task(task_id: int, db: AsyncSession = Depends(get_db)): task = await crud.get_task(db, task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return task @app.patch("/tasks/{task_id}", response_model=TaskRead) async def patch_task(task_id: int, payload: TaskUpdate, db: AsyncSession = Depends(get_db)): task = await crud.get_task(db, task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return await crud.update_task(db, task, payload) @app.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task(task_id: int, db: AsyncSession = Depends(get_db)): task = await crud.get_task(db, task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") await crud.delete_task(db, task) return None

Run it:

uvicorn app.main:app --reload

Open the interactive docs at /docs and try creating and listing tasks.

8) Add Alembic migrations (so schema changes are tracked)

Initialize Alembic in the project root:

alembic init alembic

Edit alembic.ini to point at your DB URL (for SQLite here):

sqlalchemy.url = sqlite+aiosqlite:///./dev.db

Now update alembic/env.py so Alembic knows your models’ metadata. The important part is importing Base and setting target_metadata.

# alembic/env.py (only the relevant lines) from app.db import Base from app import models # ensure models are imported so tables register target_metadata = Base.metadata

Create your first migration:

alembic revision --autogenerate -m "create tasks table"

Apply it:

alembic upgrade head

Now your database schema is generated from migrations, not “whatever happened to exist on disk.” When you change models later, you can autogenerate a new revision and review the diff before applying.

9) Quick manual test with curl

# Create curl -X POST http://127.0.0.1:8000/tasks \ -H "Content-Type: application/json" \ -d '{"title":"Ship async CRUD"}' # List curl "http://127.0.0.1:8000/tasks?limit=10&offset=0" # Patch curl -X PATCH http://127.0.0.1:8000/tasks/1 \ -H "Content-Type: application/json" \ -d '{"done":true}'

10) Practical tips (what tends to bite people)

  • Don’t share a session globally. Use a dependency that yields a new session per request, like get_db(). Global sessions cause cross-request leaks and weird transaction behavior.

  • Always commit and refresh when you need generated fields. If your response includes IDs or timestamps, await db.refresh(obj) makes sure you have the latest values.

  • Clamp pagination. Even in internal APIs, a missing limit can turn into a surprise full-table scan. Keep limit bounded (like 100).

  • Review Alembic autogenerate output. Autogenerate is helpful, not magical. Always open the revision file and sanity-check it before running upgrade.

Where to go next

Once you have this baseline working, the next steps that usually pay off are:

  • Switch SQLite to Postgres and use a real connection pool
  • Add filtering (e.g., ?done=true) using SQLAlchemy query composition
  • Add tests with pytest and an isolated test database
  • Introduce “service” functions if your business logic grows beyond CRUD

This stack (FastAPI + async SQLAlchemy + Alembic) is a solid foundation for many production APIs—and once you understand the session + migration flow, you’ll avoid the most common pitfalls that slow teams down.


Leave a Reply

Your email address will not be published. Required fields are marked *