📡 You're offline — showing cached content
New version available!
Quick Access
Tutorials FastAPI Async Endpoints

Async Endpoints

6 min read Quiz at the end
Use async def for I/O-bound endpoints and asyncio.gather for parallel DB or HTTP calls.

Async Endpoints

import asyncio
import httpx
from fastapi import FastAPI

app = FastAPI()

# Async endpoint — handles concurrent requests efficiently
@app.get("/users/{id}")
async def get_user(id: int):
    async with httpx.AsyncClient() as client:
        url = "https://api.example.com/users/" + str(id)
        response = await client.get(url)
    return response.json()

# Async DB with databases library
from databases import Database

database = Database("postgresql+asyncpg://user:pass@localhost/mydb")

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

@app.get("/stats")
async def stats():
    # Run multiple async tasks in parallel
    users_task = database.fetch_val("SELECT COUNT(*) FROM users")
    posts_task = database.fetch_val("SELECT COUNT(*) FROM posts")
    users, posts = await asyncio.gather(users_task, posts_task)
    return {"users": users, "posts": posts}