Cookbooks
FastAPI CRUD Endpoints with Pydantic Models
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
from typing import List, Optional
from uuid import UUID, uuid4
app = FastAPI()
class Item(BaseModel):
i...
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
from typing import List, Optional
from uuid import UUID, uuid4
app = FastAPI()
class Item(BaseModel):
id: Optional[UUID] = None
name: str
description: Optional[str] = None
price: float
is_active: bool = True
# In-memory storage for demo purposes
items_db = {}
@app.post("/items/", response_model=Item, status_code=status.HTTP_201_CREATED)
def create_item(item: Item):
item_id = uuid4()
item.id = item_id
items_db[item_id] = item
return item
@app.get("/items/", response_model=List[Item])
def read_items(skip: int = 0, limit: int = 100):
return list(items_db.values())[skip : skip + limit]
@app.get("/items/{item_id}", response_model=Item)
def read_item(item_id: UUID):
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
return items_db[item_id]
@app.put("/items/{item_id}", response_model=Item)
def update_item(item_id: UUID, item: Item):
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
item.id = item_id
items_db[item_id] = item
return item
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_item(item_id: UUID):
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
del items_db[item_id]
return None
SQLAlchemy ORM Session Management Context
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextman...
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
from typing import Iterator
# Database configuration
DATABASE_URL = "sqlite:///./test.db"
# Create SQLAlchemy engine and session factory
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create base class for ORM models
Base = declarative_base()
# Example model
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
# Create all tables
Base.metadata.create_all(bind=engine)
@contextmanager
def get_db() -> Iterator[Session]:
"""Context manager for database sessions.
This ensures the session is properly closed after use,
and handles rollbacks in case of exceptions.
Usage:
with get_db() as db:
db.query(User).all()
"""
db = SessionLocal()
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
# Example usage
def add_user(username: str, email: str, password: str) -> User:
"""Add a new user to the database."""
with get_db() as db:
user = User(username=username, email=email, hashed_password=password)
db.add(user)
db.flush() # Flush to get the ID
db.refresh(user)
return user
Redis Cache Decorator for Function Results
import functools
import json
import hashlib
import redis
import inspect
from typing import Any, Callable, Dict, Optional, Tuple, Union
from datetime import timedelta
# Redis client setup
redis_client...
import functools
import json
import hashlib
import redis
import inspect
from typing import Any, Callable, Dict, Optional, Tuple, Union
from datetime import timedelta
# Redis client setup
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def redis_cache(
expire: Optional[int] = 300, # Cache expiration in seconds
prefix: str = "cache:", # Cache key prefix
serialize: Callable = json.dumps,
deserialize: Callable = json.loads
):
"""Decorator to cache function results in Redis.
Args:
expire: Time in seconds before cache expires (None for no expiration)
prefix: Prefix for cache keys
serialize: Function to serialize data before storing
deserialize: Function to deserialize data after retrieval
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Create a unique cache key based on function name and arguments
cache_key = _create_cache_key(func, prefix, args, kwargs)
# Try to get result from cache
cached_result = redis_client.get(cache_key)
if cached_result:
return deserialize(cached_result)
# If not in cache, call the function
result = func(*args, **kwargs)
# Store result in cache
serialized_result = serialize(result)
if expire is not None:
redis_client.setex(cache_key, expire, serialized_result)
else:
redis_client.set(cache_key, serialized_result)
return result
return wrapper
return decorator
def _create_cache_key(func: Callable, prefix: str, args: Tuple, kwargs: Dict) -> str:
"""Create a unique cache key based on the function and its arguments."""
# Get function signature including parameter names
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
# Create a string representation of the arguments
args_repr = json.dumps({
k: v for k, v in bound_args.arguments.items()
if isinstance(v, (str, int, float, bool, list, dict, tuple)) or v is None
}, sort_keys=True)
# Create a hash of the function name and arguments
key_parts = f"{func.__module__}.{func.__name__}:{args_repr}"
key_hash = hashlib.md5(key_parts.encode()).hexdigest()
return f"{prefix}{key_hash}"
# Example usage
@redis_cache(expire=60)
def get_user_data(user_id: int) -> Dict:
# This would normally be an expensive database query
print(f"Cache miss for user_id {user_id}")
return {"id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}
Background Task Queue with Celery
# tasks.py
from celery import Celery
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Celery configuration
celery_app = Celery(
'ta...
# tasks.py
from celery import Celery
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Celery configuration
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@celery_app.task(bind=True, max_retries=3)
def send_email(self, recipient, subject, body):
"""Send email asynchronously with retry logic."""
try:
# Email sending configuration
sender_email = "your-email@example.com"
password = "your-app-password" # Use app-specific password for better security
# Create message
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = recipient
message["Subject"] = subject
message.attach(MIMEText(body, "plain"))
# Connect to SMTP server and send email
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, password)
server.send_message(message)
return {"status": "success", "recipient": recipient}
except Exception as exc:
# Retry with exponential backoff
retry_delay = 60 * (2 ** self.request.retries) # 60s, 120s, 240s
self.retry(exc=exc, countdown=retry_delay)
@celery_app.task
def process_data(data):
"""Process data asynchronously."""
# Simulate long-running task
time.sleep(5)
result = {"processed": len(data), "status": "completed"}
return result
# In your application code:
def enqueue_email_task(recipient, subject, body):
"""Enqueue an email task to be processed asynchronously."""
task = send_email.delay(recipient, subject, body)
return {"task_id": task.id}
def check_task_status(task_id):
"""Check the status of a task."""
task = celery_app.AsyncResult(task_id)
response = {
"task_id": task_id,
"status": task.status,
}
if task.status == 'SUCCESS':
response["result"] = task.get()
return response
JWT Authentication for FastAPI
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordReque...
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# Security configuration
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# Models
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
# Simulated user database
users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": pwd_context.hash("secret"),
"disabled": False,
}
}
# Helper functions
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user or not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if not username:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(users_db, token_data.username)
if not user:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# Routes
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
API Response Caching with FastAPI
from fastapi import FastAPI, Request, Response, Depends
from fastapi.responses import JSONResponse
import time
import hashlib
import json
from typing import Dict, Any, Callable, Optional
import functo...
from fastapi import FastAPI, Request, Response, Depends
from fastapi.responses import JSONResponse
import time
import hashlib
import json
from typing import Dict, Any, Callable, Optional
import functools
app = FastAPI()
# Simple in-memory cache
cache_store: Dict[str, Dict[str, Any]] = {}
def cache_response(
expire_after_seconds: int = 60,
vary_by_headers: Optional[list] = None,
vary_by_query: bool = True
):
"""Cache FastAPI response.
Args:
expire_after_seconds: Time in seconds cache is valid
vary_by_headers: List of headers to include in cache key
vary_by_query: Whether to vary cache by query parameters
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Extract request from args or kwargs
request = next((arg for arg in args if isinstance(arg, Request)),
kwargs.get('request'))
if not request:
# If no request is found, just execute the function
return await func(*args, **kwargs)
# Build the cache key
key_parts = [request.url.path]
# Add query parameters if required
if vary_by_query and request.query_params:
key_parts.append(str(dict(request.query_params)))
# Add specified headers if required
if vary_by_headers:
for header in vary_by_headers:
header_value = request.headers.get(header, '')
if header_value:
key_parts.append(f"{header}:{header_value}")
# Create a unique key by hashing the combined parts
cache_key = hashlib.md5(''.join(key_parts).encode()).hexdigest()
# Check if response is in cache and not expired
current_time = time.time()
if cache_key in cache_store:
cached_data = cache_store[cache_key]
if current_time - cached_data['timestamp'] < expire_after_seconds:
return JSONResponse(
content=cached_data['data'],
headers={
'X-Cache': 'HIT',
'X-Cache-Timestamp': str(cached_data['timestamp'])
}
)
# Execute the route handler
response = await func(*args, **kwargs)
# Cache the response data
if hasattr(response, 'body'):
try:
response_data = json.loads(response.body)
cache_store[cache_key] = {
'data': response_data,
'timestamp': current_time
}
# Add cache headers to the response
response.headers['X-Cache'] = 'MISS'
response.headers['X-Cache-Timestamp'] = str(current_time)
except (json.JSONDecodeError, UnicodeDecodeError):
# Don't cache if not JSON
pass
return response
return wrapper
return decorator
# Example usage
@app.get("/slow-data")
@cache_response(expire_after_seconds=30)
async def get_slow_data(request: Request):
# Simulate slow operation
time.sleep(2)
return {"data": "This is slow data that's now cached", "time": time.time()}
@app.get("/user-specific")
@cache_response(expire_after_seconds=60, vary_by_headers=["Authorization"])
async def get_user_data(request: Request):
# This would vary by the Authorization header
return {"user_data": "Personalized content", "time": time.time()}
Boto3 S3 File Upload with Progress Tracking
import os
import sys
import threading
import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
class ProgressPercentage:
"""Track and display S3 uploa...
import os
import sys
import threading
import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
class ProgressPercentage:
"""Track and display S3 upload progress."""
def __init__(self, filename):
self._filename = filename
self._size = float(os.path.getsize(filename))
self._seen_so_far = 0
self._lock = threading.Lock()
self._prev_percentage = 0
def __call__(self, bytes_amount):
with self._lock:
self._seen_so_far += bytes_amount
percentage = int((self._seen_so_far / self._size) * 100)
# Only update display if percentage changes
if percentage > self._prev_percentage:
self._prev_percentage = percentage
sys.stdout.write(
f"\r{self._filename} upload: {percentage}% complete")
sys.stdout.flush()
def upload_file_to_s3(
file_path,
bucket_name,
object_name=None,
extra_args=None,
multipart_threshold=8 * 1024 * 1024, # 8MB
multipart_chunksize=8 * 1024 * 1024, # 8MB
use_threads=True,
max_concurrency=10
):
"""Upload a file to an S3 bucket with progress tracking.
Args:
file_path: File to upload
bucket_name: S3 bucket name
object_name: S3 object name (if None, uses file basename)
extra_args: Extra arguments for S3 client
multipart_threshold: Threshold for multipart upload
multipart_chunksize: Size of multipart chunks
use_threads: Whether to use threads for multipart upload
max_concurrency: Maximum number of threads
Returns:
True if file was uploaded, False otherwise
"""
# If object_name not specified, use the file's basename
if object_name is None:
object_name = os.path.basename(file_path)
# Set up the transfer configuration for multipart uploads
config = TransferConfig(
multipart_threshold=multipart_threshold,
multipart_chunksize=multipart_chunksize,
use_threads=use_threads,
max_concurrency=max_concurrency
)
# Create S3 client
s3_client = boto3.client('s3')
try:
# Set up the progress tracking callback
progress_callback = ProgressPercentage(file_path)
# Upload the file with progress tracking
s3_client.upload_file(
file_path,
bucket_name,
object_name,
ExtraArgs=extra_args,
Callback=progress_callback,
Config=config
)
print(f"\nUpload complete: {file_path} -> s3://{bucket_name}/{object_name}")
return True
except ClientError as e:
print(f"\nError uploading {file_path}: {e}")
return False
# Example usage
def main():
# Example file upload with content type
upload_file_to_s3(
file_path="large_video.mp4",
bucket_name="my-media-bucket",
object_name="videos/large_video.mp4",
extra_args={
'ContentType': 'video/mp4',
'ACL': 'public-read'
}
)
if __name__ == "__main__":
main()
Prometheus Metrics for FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses ...
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define Prometheus metrics
REQUEST_COUNT = Counter(
'app_request_count',
'Application request count',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'app_request_latency_seconds',
'Application request latency',
['method', 'endpoint']
)
# Middleware to track request metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
# Process the request
response = await call_next(request)
# Record request latency
latency = time.time() - start_time
endpoint = request.url.path
# Track latency for this endpoint
REQUEST_LATENCY.labels(
method=request.method,
endpoint=endpoint
).observe(latency)
# Track request count for this endpoint and status code
REQUEST_COUNT.labels(
method=request.method,
endpoint=endpoint,
status=response.status_code
).inc()
return response
# Expose Prometheus metrics
@app.get('/metrics')
def metrics():
return Response(
generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
# Example endpoints
@app.get('/')
def home():
return {"message": "Welcome to the API"}
@app.get('/api/users')
def get_users():
# Simulate some processing time
time.sleep(0.2)
return {"users": ["user1", "user2", "user3"]}
@app.get('/api/items')
def get_items():
# Simulate some processing time
time.sleep(0.1)
return {"items": ["item1", "item2", "item3"]}
@app.get('/error')
def error():
# Simulate an error
raise ValueError("This is a test error")
# Additional endpoint to view metrics summary
@app.get('/metrics-summary')
def metrics_summary():
# This provides a human-readable summary for debugging
endpoint_data = {}
# Get data from REQUEST_COUNT metric
for sample in REQUEST_COUNT._metrics():
method = sample.labels['method']
endpoint = sample.labels['endpoint']
status = sample.labels['status']
count = sample.value
if endpoint not in endpoint_data:
endpoint_data[endpoint] = {}
if method not in endpoint_data[endpoint]:
endpoint_data[endpoint][method] = {'total': 0, 'status_codes': {}}
endpoint_data[endpoint][method]['total'] += count
if status not in endpoint_data[endpoint][method]['status_codes']:
endpoint_data[endpoint][method]['status_codes'][status] = 0
endpoint_data[endpoint][method]['status_codes'][status] += count
return endpoint_data