Cookbooks

FastAPI CRUD Endpoints with Pydantic Models

from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
from typing import List, Optional
from uuid import UUID, uuid4

app = FastAPI()

class Item(BaseModel):
    i...
                    
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
from typing import List, Optional
from uuid import UUID, uuid4

app = FastAPI()

class Item(BaseModel):
    id: Optional[UUID] = None
    name: str
    description: Optional[str] = None
    price: float
    is_active: bool = True

# In-memory storage for demo purposes
items_db = {}

@app.post("/items/", response_model=Item, status_code=status.HTTP_201_CREATED)
def create_item(item: Item):
    item_id = uuid4()
    item.id = item_id
    items_db[item_id] = item
    return item

@app.get("/items/", response_model=List[Item])
def read_items(skip: int = 0, limit: int = 100):
    return list(items_db.values())[skip : skip + limit]

@app.get("/items/{item_id}", response_model=Item)
def read_item(item_id: UUID):
    if item_id not in items_db:
        raise HTTPException(status_code=404, detail="Item not found")
    return items_db[item_id]

@app.put("/items/{item_id}", response_model=Item)
def update_item(item_id: UUID, item: Item):
    if item_id not in items_db:
        raise HTTPException(status_code=404, detail="Item not found")
    item.id = item_id
    items_db[item_id] = item
    return item

@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_item(item_id: UUID):
    if item_id not in items_db:
        raise HTTPException(status_code=404, detail="Item not found")
    del items_db[item_id]
    return None
SQLAlchemy ORM Session Management Context

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextman...
                    
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
from typing import Iterator

# Database configuration
DATABASE_URL = "sqlite:///./test.db"

# Create SQLAlchemy engine and session factory
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Create base class for ORM models
Base = declarative_base()

# Example model
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)

# Create all tables
Base.metadata.create_all(bind=engine)

@contextmanager
def get_db() -> Iterator[Session]:
    """Context manager for database sessions.
    
    This ensures the session is properly closed after use,
    and handles rollbacks in case of exceptions.
    
    Usage:
        with get_db() as db:
            db.query(User).all()
    """
    db = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()

# Example usage
def add_user(username: str, email: str, password: str) -> User:
    """Add a new user to the database."""
    with get_db() as db:
        user = User(username=username, email=email, hashed_password=password)
        db.add(user)
        db.flush()  # Flush to get the ID
        db.refresh(user)
        return user
Redis Cache Decorator for Function Results

import functools
import json
import hashlib
import redis
import inspect
from typing import Any, Callable, Dict, Optional, Tuple, Union
from datetime import timedelta

# Redis client setup
redis_client...
                    
import functools
import json
import hashlib
import redis
import inspect
from typing import Any, Callable, Dict, Optional, Tuple, Union
from datetime import timedelta

# Redis client setup
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def redis_cache(
    expire: Optional[int] = 300,  # Cache expiration in seconds
    prefix: str = "cache:",       # Cache key prefix
    serialize: Callable = json.dumps,
    deserialize: Callable = json.loads
):
    """Decorator to cache function results in Redis.
    
    Args:
        expire: Time in seconds before cache expires (None for no expiration)
        prefix: Prefix for cache keys
        serialize: Function to serialize data before storing
        deserialize: Function to deserialize data after retrieval
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            # Create a unique cache key based on function name and arguments
            cache_key = _create_cache_key(func, prefix, args, kwargs)
            
            # Try to get result from cache
            cached_result = redis_client.get(cache_key)
            if cached_result:
                return deserialize(cached_result)
            
            # If not in cache, call the function
            result = func(*args, **kwargs)
            
            # Store result in cache
            serialized_result = serialize(result)
            if expire is not None:
                redis_client.setex(cache_key, expire, serialized_result)
            else:
                redis_client.set(cache_key, serialized_result)
            
            return result
        return wrapper
    return decorator

def _create_cache_key(func: Callable, prefix: str, args: Tuple, kwargs: Dict) -> str:
    """Create a unique cache key based on the function and its arguments."""
    # Get function signature including parameter names
    sig = inspect.signature(func)
    bound_args = sig.bind(*args, **kwargs)
    bound_args.apply_defaults()
    
    # Create a string representation of the arguments
    args_repr = json.dumps({
        k: v for k, v in bound_args.arguments.items() 
        if isinstance(v, (str, int, float, bool, list, dict, tuple)) or v is None
    }, sort_keys=True)
    
    # Create a hash of the function name and arguments
    key_parts = f"{func.__module__}.{func.__name__}:{args_repr}"
    key_hash = hashlib.md5(key_parts.encode()).hexdigest()
    
    return f"{prefix}{key_hash}"

# Example usage
@redis_cache(expire=60)
def get_user_data(user_id: int) -> Dict:
    # This would normally be an expensive database query
    print(f"Cache miss for user_id {user_id}")
    return {"id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}
Background Task Queue with Celery

# tasks.py
from celery import Celery
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Celery configuration
celery_app = Celery(
    'ta...
                    
# tasks.py
from celery import Celery
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Celery configuration
celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@celery_app.task(bind=True, max_retries=3)
def send_email(self, recipient, subject, body):
    """Send email asynchronously with retry logic."""
    try:
        # Email sending configuration
        sender_email = "your-email@example.com"
        password = "your-app-password"  # Use app-specific password for better security
        
        # Create message
        message = MIMEMultipart()
        message["From"] = sender_email
        message["To"] = recipient
        message["Subject"] = subject
        message.attach(MIMEText(body, "plain"))
        
        # Connect to SMTP server and send email
        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(sender_email, password)
            server.send_message(message)
        
        return {"status": "success", "recipient": recipient}
    except Exception as exc:
        # Retry with exponential backoff
        retry_delay = 60 * (2 ** self.request.retries)  # 60s, 120s, 240s
        self.retry(exc=exc, countdown=retry_delay)

@celery_app.task
def process_data(data):
    """Process data asynchronously."""
    # Simulate long-running task
    time.sleep(5)
    result = {"processed": len(data), "status": "completed"}
    return result

# In your application code:
def enqueue_email_task(recipient, subject, body):
    """Enqueue an email task to be processed asynchronously."""
    task = send_email.delay(recipient, subject, body)
    return {"task_id": task.id}

def check_task_status(task_id):
    """Check the status of a task."""
    task = celery_app.AsyncResult(task_id)
    response = {
        "task_id": task_id,
        "status": task.status,
    }
    if task.status == 'SUCCESS':
        response["result"] = task.get()
    return response
JWT Authentication for FastAPI

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordReque...
                    
from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# Security configuration
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

# Models
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

# Simulated user database
users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": pwd_context.hash("secret"),
        "disabled": False,
    }
}

# Helper functions
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(users_db, token_data.username)
    if not user:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# Routes
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user
API Response Caching with FastAPI

from fastapi import FastAPI, Request, Response, Depends
from fastapi.responses import JSONResponse
import time
import hashlib
import json
from typing import Dict, Any, Callable, Optional
import functo...
                    
from fastapi import FastAPI, Request, Response, Depends
from fastapi.responses import JSONResponse
import time
import hashlib
import json
from typing import Dict, Any, Callable, Optional
import functools

app = FastAPI()

# Simple in-memory cache
cache_store: Dict[str, Dict[str, Any]] = {}

def cache_response(
    expire_after_seconds: int = 60,
    vary_by_headers: Optional[list] = None,
    vary_by_query: bool = True
):
    """Cache FastAPI response.
    
    Args:
        expire_after_seconds: Time in seconds cache is valid
        vary_by_headers: List of headers to include in cache key
        vary_by_query: Whether to vary cache by query parameters
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Extract request from args or kwargs
            request = next((arg for arg in args if isinstance(arg, Request)), 
                          kwargs.get('request'))
            
            if not request:
                # If no request is found, just execute the function
                return await func(*args, **kwargs)
            
            # Build the cache key
            key_parts = [request.url.path]
            
            # Add query parameters if required
            if vary_by_query and request.query_params:
                key_parts.append(str(dict(request.query_params)))
            
            # Add specified headers if required
            if vary_by_headers:
                for header in vary_by_headers:
                    header_value = request.headers.get(header, '')
                    if header_value:
                        key_parts.append(f"{header}:{header_value}")
            
            # Create a unique key by hashing the combined parts
            cache_key = hashlib.md5(''.join(key_parts).encode()).hexdigest()
            
            # Check if response is in cache and not expired
            current_time = time.time()
            if cache_key in cache_store:
                cached_data = cache_store[cache_key]
                if current_time - cached_data['timestamp'] < expire_after_seconds:
                    return JSONResponse(
                        content=cached_data['data'],
                        headers={
                            'X-Cache': 'HIT',
                            'X-Cache-Timestamp': str(cached_data['timestamp'])
                        }
                    )
            
            # Execute the route handler
            response = await func(*args, **kwargs)
            
            # Cache the response data
            if hasattr(response, 'body'):
                try:
                    response_data = json.loads(response.body)
                    cache_store[cache_key] = {
                        'data': response_data,
                        'timestamp': current_time
                    }
                    # Add cache headers to the response
                    response.headers['X-Cache'] = 'MISS'
                    response.headers['X-Cache-Timestamp'] = str(current_time)
                except (json.JSONDecodeError, UnicodeDecodeError):
                    # Don't cache if not JSON
                    pass
            
            return response
        return wrapper
    return decorator

# Example usage
@app.get("/slow-data")
@cache_response(expire_after_seconds=30)
async def get_slow_data(request: Request):
    # Simulate slow operation
    time.sleep(2)
    return {"data": "This is slow data that's now cached", "time": time.time()}

@app.get("/user-specific")
@cache_response(expire_after_seconds=60, vary_by_headers=["Authorization"])
async def get_user_data(request: Request):
    # This would vary by the Authorization header
    return {"user_data": "Personalized content", "time": time.time()}
Boto3 S3 File Upload with Progress Tracking

import os
import sys
import threading
import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError

class ProgressPercentage:
    """Track and display S3 uploa...
                    
import os
import sys
import threading
import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError

class ProgressPercentage:
    """Track and display S3 upload progress."""
    def __init__(self, filename):
        self._filename = filename
        self._size = float(os.path.getsize(filename))
        self._seen_so_far = 0
        self._lock = threading.Lock()
        self._prev_percentage = 0
    
    def __call__(self, bytes_amount):
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = int((self._seen_so_far / self._size) * 100)
            
            # Only update display if percentage changes
            if percentage > self._prev_percentage:
                self._prev_percentage = percentage
                sys.stdout.write(
                    f"\r{self._filename} upload: {percentage}% complete")
                sys.stdout.flush()

def upload_file_to_s3(
    file_path, 
    bucket_name, 
    object_name=None, 
    extra_args=None,
    multipart_threshold=8 * 1024 * 1024,  # 8MB
    multipart_chunksize=8 * 1024 * 1024,  # 8MB
    use_threads=True,
    max_concurrency=10
):
    """Upload a file to an S3 bucket with progress tracking.
    
    Args:
        file_path: File to upload
        bucket_name: S3 bucket name
        object_name: S3 object name (if None, uses file basename)
        extra_args: Extra arguments for S3 client
        multipart_threshold: Threshold for multipart upload
        multipart_chunksize: Size of multipart chunks
        use_threads: Whether to use threads for multipart upload
        max_concurrency: Maximum number of threads
        
    Returns:
        True if file was uploaded, False otherwise
    """
    # If object_name not specified, use the file's basename
    if object_name is None:
        object_name = os.path.basename(file_path)
    
    # Set up the transfer configuration for multipart uploads
    config = TransferConfig(
        multipart_threshold=multipart_threshold,
        multipart_chunksize=multipart_chunksize,
        use_threads=use_threads,
        max_concurrency=max_concurrency
    )
    
    # Create S3 client
    s3_client = boto3.client('s3')
    
    try:
        # Set up the progress tracking callback
        progress_callback = ProgressPercentage(file_path)
        
        # Upload the file with progress tracking
        s3_client.upload_file(
            file_path, 
            bucket_name, 
            object_name,
            ExtraArgs=extra_args,
            Callback=progress_callback,
            Config=config
        )
        print(f"\nUpload complete: {file_path} -> s3://{bucket_name}/{object_name}")
        return True
    except ClientError as e:
        print(f"\nError uploading {file_path}: {e}")
        return False

# Example usage
def main():
    # Example file upload with content type
    upload_file_to_s3(
        file_path="large_video.mp4",
        bucket_name="my-media-bucket",
        object_name="videos/large_video.mp4",
        extra_args={
            'ContentType': 'video/mp4',
            'ACL': 'public-read'
        }
    )

if __name__ == "__main__":
    main()
Prometheus Metrics for FastAPI

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses ...
                    
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time

app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Define Prometheus metrics
REQUEST_COUNT = Counter(
    'app_request_count',
    'Application request count',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'app_request_latency_seconds',
    'Application request latency',
    ['method', 'endpoint']
)

# Middleware to track request metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    
    # Process the request
    response = await call_next(request)
    
    # Record request latency
    latency = time.time() - start_time
    endpoint = request.url.path
    
    # Track latency for this endpoint
    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=endpoint
    ).observe(latency)
    
    # Track request count for this endpoint and status code
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=endpoint,
        status=response.status_code
    ).inc()
    
    return response

# Expose Prometheus metrics
@app.get('/metrics')
def metrics():
    return Response(
        generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )

# Example endpoints
@app.get('/')
def home():
    return {"message": "Welcome to the API"}

@app.get('/api/users')
def get_users():
    # Simulate some processing time
    time.sleep(0.2)
    return {"users": ["user1", "user2", "user3"]}

@app.get('/api/items')
def get_items():
    # Simulate some processing time
    time.sleep(0.1)
    return {"items": ["item1", "item2", "item3"]}

@app.get('/error')
def error():
    # Simulate an error
    raise ValueError("This is a test error")

# Additional endpoint to view metrics summary
@app.get('/metrics-summary')
def metrics_summary():
    # This provides a human-readable summary for debugging
    endpoint_data = {}
    
    # Get data from REQUEST_COUNT metric
    for sample in REQUEST_COUNT._metrics():
        method = sample.labels['method']
        endpoint = sample.labels['endpoint']
        status = sample.labels['status']
        count = sample.value
        
        if endpoint not in endpoint_data:
            endpoint_data[endpoint] = {}
        if method not in endpoint_data[endpoint]:
            endpoint_data[endpoint][method] = {'total': 0, 'status_codes': {}}
            
        endpoint_data[endpoint][method]['total'] += count
        if status not in endpoint_data[endpoint][method]['status_codes']:
            endpoint_data[endpoint][method]['status_codes'][status] = 0
        endpoint_data[endpoint][method]['status_codes'][status] += count
    
    return endpoint_data