Chào mừng bạn đến với Bài 7 của khoá học Python FastAPI. Đây là một trong những bài quan trọng nhất trong khoá học. Ba công cụ mà chúng ta học hôm nay — Dependency Injection, Middleware, và Background Tasks — là thứ phân biệt giữa một API “chạy được” và một API “production-ready”.
Nếu Bài 5-6 dạy bạn viết từng endpoint, thì bài này dạy bạn xử lý các vấn đề cắt ngang: làm sao authentication một lần dùng cho cả trăm endpoint, làm sao log mọi request tự động, làm sao gửi email sau khi response đã trả về cho client mà không bắt họ chờ.
Tiếp tục xây dựng TaskAPI, chúng ta sẽ thêm các tính năng thực tế như logging, request ID tracking, database session management, và email notification. 🚀
1. Dependency Injection — Tư duy cốt lõi
- Dependency Injection (DI) nghe có vẻ academic, nhưng ý tưởng cực kỳ đơn giản: thay vì function tự tạo ra thứ nó cần, nó nhận thứ đó từ bên ngoài truyền vào.
- Tại sao quan trọng? Vì nó giải quyết 3 vấn đề lớn: reusability (viết một lần, dùng nhiều nơi), testability (dễ mock trong test), và separation of concerns (business logic tách biệt với infrastructure).
Ở Bài 6 bạn đã gặp một dependency đơn giản (get_task_or_404). Giờ hãy đi sâu hơn.
2. Dependency cơ bản
Một dependency trong FastAPI chỉ đơn giản là một callable — function hoặc class — được dùng với Depends():
from fastapi import Depends, FastAPI, Query
from typing import Optional
app = FastAPI()
# Dependency: xử lý pagination params dùng chung
def pagination_params(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
) -> dict:
return {
"page": page,
"page_size": page_size,
"offset": (page - 1) * page_size,
}
@app.get("/tasks")
def list_tasks(pagination: dict = Depends(pagination_params)):
offset = pagination["offset"]
limit = pagination["page_size"]
return {"offset": offset, "limit": limit}
@app.get("/users")
def list_users(pagination: dict = Depends(pagination_params)):
# Cùng pagination logic, không copy-paste
...
FastAPI làm phần việc “thần kỳ”: nó đọc signature của pagination_params, thấy page và page_size là Query params, và tự thêm chúng vào Swagger cho mỗi endpoint dùng dependency này. Tài liệu luôn chính xác, không cần nhắc lại.
3. Dependency lồng nhau (Sub-dependencies)
Dependency có thể phụ thuộc vào dependency khác. FastAPI tự động resolve chuỗi này:
from fastapi import Depends, Header, HTTPException
def get_api_key(x_api_key: str = Header(...)) -> str:
if not x_api_key.startswith("sk_"):
raise HTTPException(401, "API key không hợp lệ")
return x_api_key
def get_current_user(api_key: str = Depends(get_api_key)) -> dict:
user = find_user_by_api_key(api_key)
if not user:
raise HTTPException(401, "User không tồn tại")
return user
def verify_admin(user: dict = Depends(get_current_user)) -> dict:
if user["role"] != "admin":
raise HTTPException(403, "Yêu cầu quyền admin")
return user
@app.delete("/tasks/{task_id}")
def delete_task(task_id: int, admin: dict = Depends(verify_admin)):
# admin có sẵn vì đã qua: get_api_key → get_current_user → verify_admin
return {"deleted": task_id}
Chuỗi dependency rõ ràng và có thể tái sử dụng: get_current_user dùng cho endpoint cần auth thường, verify_admin chỉ cho admin-only. Không copy-paste code auth ở mọi endpoint.
4. Class-based Dependencies
Khi dependency phức tạp, dùng class rõ ràng hơn function:
from fastapi import Depends, Query
from typing import Optional
class TaskFilter:
def __init__(
self,
status: Optional[str] = Query(None),
priority: Optional[str] = Query(None),
search: Optional[str] = Query(None, min_length=2),
tags: list[str] = Query([]),
):
self.status = status
self.priority = priority
self.search = search
self.tags = tags
def apply(self, tasks: list) -> list:
result = tasks
if self.status:
result = [t for t in result if t["status"] == self.status]
if self.priority:
result = [t for t in result if t["priority"] == self.priority]
if self.search:
s = self.search.lower()
result = [t for t in result if s in t["title"].lower()]
if self.tags:
result = [t for t in result if set(self.tags) & set(t["tags"])]
return result
@app.get("/tasks")
def list_tasks(filters: TaskFilter = Depends()):
# Chú ý: Depends() không có argument - FastAPI hiểu là dùng TaskFilter
all_tasks = get_all_tasks()
return filters.apply(all_tasks)
Class-based dependency tốt khi bạn cần bundle nhiều params liên quan và có các method xử lý logic liên quan đến chúng.
5. yield — Dependency có setup/teardown
Đây là một trong những tính năng mạnh nhất của FastAPI DI. Khi dependency cần dọn dẹp tài nguyên (đóng DB connection, close file, rollback transaction…), dùng yield:
from sqlalchemy.orm import Session
from app.database import SessionLocal
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/tasks/{task_id}")
def get_task(task_id: int, db: Session = Depends(get_db)):
return db.query(Task).filter(Task.id == task_id).first()
Flow thực thi: FastAPI gọi get_db() → nhận db từ yield → truyền vào endpoint → endpoint chạy xong → FastAPI quay lại chạy phần sau yield (tức db.close()).
Code trong finally luôn chạy, kể cả khi endpoint raise exception — đảm bảo không bao giờ leak connection.
yield với exception handling
def get_db():
db = SessionLocal()
try:
yield db
db.commit() # Commit nếu không có exception
except Exception:
db.rollback() # Rollback nếu có lỗi
raise
finally:
db.close()
Pattern này — transaction tự động theo request lifecycle — là cách làm chuẩn trong hầu hết production app với SQLAlchemy. Chúng ta sẽ dùng lại nó ở Bài 10.
6. Dependencies không truyền giá trị (path operation dependencies)
Đôi khi bạn cần chạy dependency (để side effect, ví dụ check permission) nhưng không cần giá trị trả về của nó:
def verify_api_key(x_api_key: str = Header(...)):
if x_api_key != "secret":
raise HTTPException(401, "Invalid API key")
# Không return gì
# Chạy dependency nhưng không bind vào param nào
@app.get("/admin/stats", dependencies=[Depends(verify_api_key)])
def get_stats():
return {"users": 100, "tasks": 500}
7. Global Dependencies
Áp dụng dependency cho mọi endpoint trong app hoặc router:
from fastapi import FastAPI, Depends
# Toàn bộ app
app = FastAPI(dependencies=[Depends(verify_api_key)])
# Hoặc chỉ cho một router
from fastapi import APIRouter
admin_router = APIRouter(
prefix="/admin",
dependencies=[Depends(verify_admin)],
)
@admin_router.get("/users")
def list_all_users():
# verify_admin tự động được gọi
return {...}
@admin_router.delete("/tasks/{id}")
def force_delete(id: int):
# verify_admin tự động được gọi
return {...}
8. Dependency Caching
Mặc định, FastAPI cache kết quả của dependency trong cùng một request. Nếu nhiều sub-dependency cùng phụ thuộc get_current_user, nó chỉ được gọi một lần:
def get_current_user(...):
print("Query DB...")
return user
def dep_a(user = Depends(get_current_user)): ...
def dep_b(user = Depends(get_current_user)): ...
@app.get("/endpoint")
def handler(
a = Depends(dep_a),
b = Depends(dep_b),
):
# "Query DB..." chỉ in MỘT LẦN mặc dù dùng 2 nơi
...
# Nếu muốn bỏ cache:
@app.get("/endpoint2")
def handler2(user = Depends(get_current_user, use_cache=False)):
...
Caching này ở scope request — không phải ở scope application. Mỗi request mới, cache bị reset.
9. Middleware — Logic chạy cho mọi request
Middleware là code chạy trước và sau mỗi request. Nó thấy mọi thứ đi qua app. Dùng cho: logging, request ID tracking, CORS, rate limiting, performance monitoring, compression…
Middleware đơn giản nhất
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
# call_next chuyển request xuống handler tiếp theo
response = await call_next(request)
# Code ở đây chạy SAU KHI endpoint đã xử lý xong
process_time = time.time() - start_time
response.headers["X-Process-Time"] = f"{process_time:.4f}"
return response
Middleware này thêm header X-Process-Time cho mọi response — cực kỳ hữu ích để phát hiện endpoint chậm.
Request logging middleware
import logging
import uuid
logger = logging.getLogger("taskapi")
@app.middleware("http")
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())
# Log request
logger.info(
f"[{request_id}] {request.method} {request.url.path} "
f"from {request.client.host}"
)
start = time.time()
try:
response = await call_next(request)
except Exception as e:
logger.exception(f"[{request_id}] Lỗi không xử lý được: {e}")
raise
duration = time.time() - start
logger.info(
f"[{request_id}] Response {response.status_code} "
f"trong {duration:.3f}s"
)
# Thêm request ID vào response để debug
response.headers["X-Request-ID"] = request_id
return response
Bây giờ mọi request đều có ID duy nhất. Khi có bug, user chỉ cần đưa bạn X-Request-ID là bạn tìm được toàn bộ log liên quan trong vài giây.
10. Middleware class-based
Với middleware phức tạp hơn, dùng class:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app: ASGIApp, requests_per_minute: int = 60):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self._counters: dict[str, list[float]] = {}
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
now = time.time()
# Lấy timestamps của 1 phút gần nhất
timestamps = self._counters.get(client_ip, [])
timestamps = [t for t in timestamps if now - t < 60]
if len(timestamps) >= self.requests_per_minute:
from fastapi.responses import JSONResponse
return JSONResponse(
status_code=429,
content={"detail": "Too many requests"},
headers={"Retry-After": "60"},
)
timestamps.append(now)
self._counters[client_ip] = timestamps
return await call_next(request)
app.add_middleware(RateLimitMiddleware, requests_per_minute=100)
Chú ý: implementation này chỉ là demo. Production cần lưu counter vào Redis để hoạt động với nhiều server instance.
11. Các middleware có sẵn của FastAPI
FastAPI và Starlette có một số middleware built-in rất hữu ích:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
app = FastAPI()
# CORS - cho phép cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Gzip - nén response > 1KB để giảm băng thông
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Chỉ chấp nhận request từ các host được whitelist
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["example.com", "*.example.com"],
)
# Redirect HTTP → HTTPS (cho production)
app.add_middleware(HTTPSRedirectMiddleware)
12. Thứ tự middleware quan trọng
Middleware được thực thi theo thứ tự ngược lại so với khi bạn add:
app.add_middleware(MiddlewareA) # Add đầu tiên
app.add_middleware(MiddlewareB)
app.add_middleware(MiddlewareC) # Add cuối cùng
# Request flow:
# Request → C → B → A → Endpoint → A → B → C → Response
# (C là outer nhất, A là inner nhất)
Tức là middleware add cuối cùng sẽ chạy đầu tiên khi request đi vào, và cuối cùng khi response đi ra. Nhớ điều này khi sắp xếp các middleware phụ thuộc vào nhau.
13. Middleware vs Dependencies — Khi nào dùng gì?
Hai công cụ có vẻ giống nhau nhưng khác biệt quan trọng:
- Middleware: áp dụng cho MỌI request, kể cả static files, không biết endpoint nào sẽ được gọi. Nhận
Requestraw, không có access vào các param đã được parse. Thích hợp cho: logging, CORS, compression, request ID. - Dependencies: áp dụng có chọn lọc (per-endpoint hoặc per-router). Chạy sau khi FastAPI đã parse path/query/body. Có thể trả về giá trị cho endpoint dùng. Thích hợp cho: authentication, permission check, DB session, business logic chung.
Quy tắc đơn giản: nếu logic liên quan đến nội dung cụ thể của endpoint (user, permission, data), dùng dependency. Nếu là infrastructure chung cho mọi request, dùng middleware.
14. Background Tasks — Trả response ngay, làm việc sau
Đôi khi sau khi xử lý request, bạn cần làm việc gì đó tốn thời gian nhưng không nhất thiết client phải chờ: gửi email xác nhận, sinh report, gửi webhook, update search index…
Background tasks cho phép bạn trả response cho client ngay lập tức, và code background chạy sau đó.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def send_welcome_email(email: str, name: str):
# Giả lập gửi email - mất 3 giây
import time
time.sleep(3)
print(f"Đã gửi email chào mừng tới {email}")
def log_user_action(user_id: int, action: str):
with open("audit.log", "a") as f:
f.write(f"{user_id}: {action}\n")
@app.post("/users")
def create_user(
email: str,
name: str,
background_tasks: BackgroundTasks,
):
user = save_user(email, name)
# Đăng ký task chạy SAU KHI response được gửi
background_tasks.add_task(send_welcome_email, email, name)
background_tasks.add_task(log_user_action, user["id"], "created")
return {"id": user["id"], "email": email}
# Client nhận response ngay, không phải chờ 3 giây cho email
15. Background tasks trong dependencies
Bạn có thể đăng ký background task từ bên trong dependency — rất tiện cho audit logging:
def audit_logger(
request: Request,
background_tasks: BackgroundTasks,
):
def log_request():
logger.info(
f"Audit: {request.method} {request.url.path} "
f"at {datetime.utcnow()}"
)
background_tasks.add_task(log_request)
@app.post("/tasks", dependencies=[Depends(audit_logger)])
def create_task(task: TaskCreate):
# audit tự động được log sau response
...
16. Giới hạn của BackgroundTasks
Đây là phần cực kỳ quan trọng mà nhiều người bỏ qua:
- Background tasks chạy trong cùng process với FastAPI app. Nếu task ngốn nhiều CPU/memory, nó làm chậm cả server. Nếu server restart, task đang chạy bị mất.
- Dùng BackgroundTasks khi: task ngắn (vài giây), không quan trọng nếu thỉnh thoảng thất bại, không cần retry, ví dụ: gửi email thông báo, log audit, cleanup file tạm.
- Đừng dùng khi: task dài (vài phút trở lên), cần retry khi thất bại, cần chạy định kỳ, task quan trọng không được mất. Lúc đó cần task queue thực sự.
17. Khi nào cần task queue (Celery, RQ, Dramatiq)
Với các task “nghiêm túc”, dùng task queue chuyên dụng:
# Với Celery
from celery import Celery
celery_app = Celery("taskapi", broker="redis://localhost:6379/0")
@celery_app.task(bind=True, max_retries=3)
def generate_report(self, task_id: int, user_email: str):
try:
# Xử lý tốn thời gian (vài phút)
report = build_huge_report(task_id)
send_email(user_email, report)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
# Trong FastAPI endpoint
@app.post("/tasks/{task_id}/report")
def request_report(task_id: int, user_email: str):
# Enqueue vào Celery, chạy ở worker process riêng
generate_report.delay(task_id, user_email)
return {"message": "Report đang được tạo, sẽ gửi qua email khi xong"}
So sánh nhanh:
- BackgroundTasks: built-in, đơn giản, không cần infrastructure thêm. Phù hợp task ngắn, không critical.
- Celery/RQ/Dramatiq: cần Redis/RabbitMQ, phức tạp hơn. Phù hợp task dài, cần retry, cần monitoring, cần scaling worker độc lập với API.
Khoá học này tập trung vào BackgroundTasks. Khi dự án của bạn scale đến mức cần task queue, Celery là lựa chọn phổ biến nhất trong Python.
18. Lifespan Events — Startup và Shutdown
Ngoài per-request hooks, bạn có thể chạy code khi app khởi động và tắt:
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# === Startup ===
print("App đang khởi động...")
app.state.db_pool = await create_db_pool()
app.state.redis = await connect_redis()
print("Sẵn sàng nhận request")
yield # App chạy ở đây
# === Shutdown ===
print("App đang tắt...")
await app.state.db_pool.close()
await app.state.redis.close()
print("Cleanup xong")
app = FastAPI(lifespan=lifespan)
Lifespan rất quan trọng cho:
Tạo connection pool (DB, Redis, HTTP client) một lần, dùng cho cả vòng đời app — không phải tạo mới mỗi request. Load model ML vào memory lúc start, không phải load lại mỗi request. Đóng connection và file handle khi shutdown để tránh data loss.
19. Ví dụ tổng hợp: Tất cả cùng nhau
Cập nhật TaskAPI với mọi thứ vừa học:
# app/main.py
import time
import uuid
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from app.routers import tasks
from app.config import settings
logger = logging.getLogger("taskapi")
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("TaskAPI khởi động")
# app.state.db = await create_pool()
yield
logger.info("TaskAPI tắt")
# await app.state.db.close()
app = FastAPI(
title="TaskAPI",
version="0.1.0",
lifespan=lifespan,
)
# === Middlewares ===
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.middleware("http")
async def request_context(request: Request, call_next):
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
start = time.time()
logger.info(f"[{request_id}] {request.method} {request.url.path}")
try:
response = await call_next(request)
except Exception:
logger.exception(f"[{request_id}] Unhandled exception")
raise
duration = time.time() - start
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = f"{duration:.4f}"
logger.info(
f"[{request_id}] {response.status_code} ({duration:.3f}s)"
)
return response
# === Routers ===
app.include_router(tasks.router, prefix="/api/v1")
# app/routers/tasks.py
from fastapi import APIRouter, BackgroundTasks, Depends, status
from app.models.task import TaskCreate, TaskResponse
from app.dependencies import get_current_user, pagination_params
from app.services.notifications import notify_task_created
router = APIRouter(prefix="/tasks", tags=["tasks"])
@router.post(
"/",
response_model=TaskResponse,
status_code=status.HTTP_201_CREATED,
)
def create_task(
task: TaskCreate,
background_tasks: BackgroundTasks,
user: dict = Depends(get_current_user),
):
new_task = save_task(task, owner_id=user["id"])
# Chạy background: gửi notification, update search index
background_tasks.add_task(notify_task_created, new_task, user["email"])
background_tasks.add_task(update_search_index, new_task)
return new_task
@router.get("/")
def list_tasks(
user: dict = Depends(get_current_user),
pagination: dict = Depends(pagination_params),
):
return get_user_tasks(user["id"], **pagination)
20. Bài tập
Mở rộng TaskAPI của bạn:
- Viết dependency
require_permission(permission: str)dùng pattern dependency-with-parameters (gợi ý: class-based hoặc closure). - Tạo middleware
CorrelationIDMiddlewaređọcX-Correlation-IDtừ request, generate nếu chưa có, truyền vào response. Implement background task gửi “task due reminder” khi có task được tạo vớidue_date. - Chuyển hardcoded DB dict sang dùng dependency
get_db()với yield pattern (chuẩn bị cho Bài 10). - Thêm middleware log ra file với format JSON (mỗi dòng là 1 JSON object, dễ parse bằng tools phân tích log).
Tổng kết
Đây là một trong những bài “nặng” nhất của khoá học, và bạn vừa nắm được các kỹ thuật quan trọng nhất của FastAPI:
- Dependency Injection: công cụ mạnh nhất để tổ chức code — từ dependency đơn giản đến sub-dependencies, class-based, yield pattern cho resource management, và global dependencies.
- Middleware: xử lý cross-cutting concerns cho mọi request — logging, CORS, compression, rate limiting. Hiểu rõ khi nào dùng middleware vs dependency.
- Background Tasks: trả response nhanh cho client, xử lý việc tốn thời gian sau. Biết giới hạn và khi nào cần chuyển sang task queue thực sự.
- Lifespan events: quản lý resource ở vòng đời app.
Những pattern này sẽ xuất hiện lặp đi lặp lại trong mọi project FastAPI của bạn. Hãy luyện tập cho đến khi chúng trở thành “phản xạ”.
Ở Bài 8 tiếp theo, chúng ta sẽ tập trung vào một chủ đề rất thực tế: xử lý lỗi. Làm sao để custom exception handlers, thiết kế format lỗi thống nhất cho cả API, xử lý validation errors đẹp hơn, và các pattern xử lý lỗi trong production.
