Chào mừng bạn đến với Bài 7 của khoá học Python FastAPI. Đây là một trong những bài quan trọng nhất trong khoá học. Ba công cụ mà chúng ta học hôm nay — Dependency Injection, Middleware, và Background Tasks — là thứ phân biệt giữa một API “chạy được” và một API “production-ready”.

Nếu Bài 5-6 dạy bạn viết từng endpoint, thì bài này dạy bạn xử lý các vấn đề cắt ngang: làm sao authentication một lần dùng cho cả trăm endpoint, làm sao log mọi request tự động, làm sao gửi email sau khi response đã trả về cho client mà không bắt họ chờ.

Tiếp tục xây dựng TaskAPI, chúng ta sẽ thêm các tính năng thực tế như logging, request ID tracking, database session management, và email notification. 🚀

1. Dependency Injection — Tư duy cốt lõi

  • Dependency Injection (DI) nghe có vẻ academic, nhưng ý tưởng cực kỳ đơn giản: thay vì function tự tạo ra thứ nó cần, nó nhận thứ đó từ bên ngoài truyền vào.
  • Tại sao quan trọng? Vì nó giải quyết 3 vấn đề lớn: reusability (viết một lần, dùng nhiều nơi), testability (dễ mock trong test), và separation of concerns (business logic tách biệt với infrastructure).

Ở Bài 6 bạn đã gặp một dependency đơn giản (get_task_or_404). Giờ hãy đi sâu hơn.

2. Dependency cơ bản

Một dependency trong FastAPI chỉ đơn giản là một callable — function hoặc class — được dùng với Depends():

from fastapi import Depends, FastAPI, Query
from typing import Optional

app = FastAPI()

# Dependency: xử lý pagination params dùng chung
def pagination_params(
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
) -> dict:
    return {
        "page": page,
        "page_size": page_size,
        "offset": (page - 1) * page_size,
    }

@app.get("/tasks")
def list_tasks(pagination: dict = Depends(pagination_params)):
    offset = pagination["offset"]
    limit = pagination["page_size"]
    return {"offset": offset, "limit": limit}

@app.get("/users")
def list_users(pagination: dict = Depends(pagination_params)):
    # Cùng pagination logic, không copy-paste
    ...

FastAPI làm phần việc “thần kỳ”: nó đọc signature của pagination_params, thấy pagepage_size là Query params, và tự thêm chúng vào Swagger cho mỗi endpoint dùng dependency này. Tài liệu luôn chính xác, không cần nhắc lại.

3. Dependency lồng nhau (Sub-dependencies)

Dependency có thể phụ thuộc vào dependency khác. FastAPI tự động resolve chuỗi này:

from fastapi import Depends, Header, HTTPException

def get_api_key(x_api_key: str = Header(...)) -> str:
    if not x_api_key.startswith("sk_"):
        raise HTTPException(401, "API key không hợp lệ")
    return x_api_key

def get_current_user(api_key: str = Depends(get_api_key)) -> dict:
    user = find_user_by_api_key(api_key)
    if not user:
        raise HTTPException(401, "User không tồn tại")
    return user

def verify_admin(user: dict = Depends(get_current_user)) -> dict:
    if user["role"] != "admin":
        raise HTTPException(403, "Yêu cầu quyền admin")
    return user

@app.delete("/tasks/{task_id}")
def delete_task(task_id: int, admin: dict = Depends(verify_admin)):
    # admin có sẵn vì đã qua: get_api_key → get_current_user → verify_admin
    return {"deleted": task_id}

Chuỗi dependency rõ ràng và có thể tái sử dụng: get_current_user dùng cho endpoint cần auth thường, verify_admin chỉ cho admin-only. Không copy-paste code auth ở mọi endpoint.

4. Class-based Dependencies

Khi dependency phức tạp, dùng class rõ ràng hơn function:

from fastapi import Depends, Query
from typing import Optional

class TaskFilter:
    def __init__(
        self,
        status: Optional[str] = Query(None),
        priority: Optional[str] = Query(None),
        search: Optional[str] = Query(None, min_length=2),
        tags: list[str] = Query([]),
    ):
        self.status = status
        self.priority = priority
        self.search = search
        self.tags = tags

    def apply(self, tasks: list) -> list:
        result = tasks
        if self.status:
            result = [t for t in result if t["status"] == self.status]
        if self.priority:
            result = [t for t in result if t["priority"] == self.priority]
        if self.search:
            s = self.search.lower()
            result = [t for t in result if s in t["title"].lower()]
        if self.tags:
            result = [t for t in result if set(self.tags) & set(t["tags"])]
        return result


@app.get("/tasks")
def list_tasks(filters: TaskFilter = Depends()):
    # Chú ý: Depends() không có argument - FastAPI hiểu là dùng TaskFilter
    all_tasks = get_all_tasks()
    return filters.apply(all_tasks)

Class-based dependency tốt khi bạn cần bundle nhiều params liên quan và có các method xử lý logic liên quan đến chúng.

5. yield — Dependency có setup/teardown

Đây là một trong những tính năng mạnh nhất của FastAPI DI. Khi dependency cần dọn dẹp tài nguyên (đóng DB connection, close file, rollback transaction…), dùng yield:

from sqlalchemy.orm import Session
from app.database import SessionLocal

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/tasks/{task_id}")
def get_task(task_id: int, db: Session = Depends(get_db)):
    return db.query(Task).filter(Task.id == task_id).first()

Flow thực thi: FastAPI gọi get_db() → nhận db từ yield → truyền vào endpoint → endpoint chạy xong → FastAPI quay lại chạy phần sau yield (tức db.close()).

Code trong finally luôn chạy, kể cả khi endpoint raise exception — đảm bảo không bao giờ leak connection.

yield với exception handling

def get_db():
    db = SessionLocal()
    try:
        yield db
        db.commit()  # Commit nếu không có exception
    except Exception:
        db.rollback()  # Rollback nếu có lỗi
        raise
    finally:
        db.close()

Pattern này — transaction tự động theo request lifecycle — là cách làm chuẩn trong hầu hết production app với SQLAlchemy. Chúng ta sẽ dùng lại nó ở Bài 10.

6. Dependencies không truyền giá trị (path operation dependencies)

Đôi khi bạn cần chạy dependency (để side effect, ví dụ check permission) nhưng không cần giá trị trả về của nó:

def verify_api_key(x_api_key: str = Header(...)):
    if x_api_key != "secret":
        raise HTTPException(401, "Invalid API key")
    # Không return gì

# Chạy dependency nhưng không bind vào param nào
@app.get("/admin/stats", dependencies=[Depends(verify_api_key)])
def get_stats():
    return {"users": 100, "tasks": 500}

7. Global Dependencies

Áp dụng dependency cho mọi endpoint trong app hoặc router:

from fastapi import FastAPI, Depends

# Toàn bộ app
app = FastAPI(dependencies=[Depends(verify_api_key)])

# Hoặc chỉ cho một router
from fastapi import APIRouter
admin_router = APIRouter(
    prefix="/admin",
    dependencies=[Depends(verify_admin)],
)

@admin_router.get("/users")
def list_all_users():
    # verify_admin tự động được gọi
    return {...}

@admin_router.delete("/tasks/{id}")
def force_delete(id: int):
    # verify_admin tự động được gọi
    return {...}

8. Dependency Caching

Mặc định, FastAPI cache kết quả của dependency trong cùng một request. Nếu nhiều sub-dependency cùng phụ thuộc get_current_user, nó chỉ được gọi một lần:

def get_current_user(...):
    print("Query DB...")
    return user

def dep_a(user = Depends(get_current_user)): ...
def dep_b(user = Depends(get_current_user)): ...

@app.get("/endpoint")
def handler(
    a = Depends(dep_a),
    b = Depends(dep_b),
):
    # "Query DB..." chỉ in MỘT LẦN mặc dù dùng 2 nơi
    ...

# Nếu muốn bỏ cache:
@app.get("/endpoint2")
def handler2(user = Depends(get_current_user, use_cache=False)):
    ...

Caching này ở scope request — không phải ở scope application. Mỗi request mới, cache bị reset.

9. Middleware — Logic chạy cho mọi request

Middleware là code chạy trướcsau mỗi request. Nó thấy mọi thứ đi qua app. Dùng cho: logging, request ID tracking, CORS, rate limiting, performance monitoring, compression…

Middleware đơn giản nhất

import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()

    # call_next chuyển request xuống handler tiếp theo
    response = await call_next(request)

    # Code ở đây chạy SAU KHI endpoint đã xử lý xong
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = f"{process_time:.4f}"
    return response

Middleware này thêm header X-Process-Time cho mọi response — cực kỳ hữu ích để phát hiện endpoint chậm.

Request logging middleware

import logging
import uuid

logger = logging.getLogger("taskapi")

@app.middleware("http")
async def log_requests(request: Request, call_next):
    request_id = str(uuid.uuid4())

    # Log request
    logger.info(
        f"[{request_id}] {request.method} {request.url.path} "
        f"from {request.client.host}"
    )

    start = time.time()
    try:
        response = await call_next(request)
    except Exception as e:
        logger.exception(f"[{request_id}] Lỗi không xử lý được: {e}")
        raise

    duration = time.time() - start
    logger.info(
        f"[{request_id}] Response {response.status_code} "
        f"trong {duration:.3f}s"
    )

    # Thêm request ID vào response để debug
    response.headers["X-Request-ID"] = request_id
    return response

Bây giờ mọi request đều có ID duy nhất. Khi có bug, user chỉ cần đưa bạn X-Request-ID là bạn tìm được toàn bộ log liên quan trong vài giây.

10. Middleware class-based

Với middleware phức tạp hơn, dùng class:

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: ASGIApp, requests_per_minute: int = 60):
        super().__init__(app)
        self.requests_per_minute = requests_per_minute
        self._counters: dict[str, list[float]] = {}

    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host
        now = time.time()

        # Lấy timestamps của 1 phút gần nhất
        timestamps = self._counters.get(client_ip, [])
        timestamps = [t for t in timestamps if now - t < 60]

        if len(timestamps) >= self.requests_per_minute:
            from fastapi.responses import JSONResponse
            return JSONResponse(
                status_code=429,
                content={"detail": "Too many requests"},
                headers={"Retry-After": "60"},
            )

        timestamps.append(now)
        self._counters[client_ip] = timestamps

        return await call_next(request)


app.add_middleware(RateLimitMiddleware, requests_per_minute=100)

Chú ý: implementation này chỉ là demo. Production cần lưu counter vào Redis để hoạt động với nhiều server instance.

11. Các middleware có sẵn của FastAPI

FastAPI và Starlette có một số middleware built-in rất hữu ích:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

app = FastAPI()

# CORS - cho phép cross-origin requests
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Gzip - nén response > 1KB để giảm băng thông
app.add_middleware(GZipMiddleware, minimum_size=1000)

# Chỉ chấp nhận request từ các host được whitelist
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["example.com", "*.example.com"],
)

# Redirect HTTP → HTTPS (cho production)
app.add_middleware(HTTPSRedirectMiddleware)

12. Thứ tự middleware quan trọng

Middleware được thực thi theo thứ tự ngược lại so với khi bạn add:

app.add_middleware(MiddlewareA)  # Add đầu tiên
app.add_middleware(MiddlewareB)
app.add_middleware(MiddlewareC)  # Add cuối cùng

# Request flow:
# Request → C → B → A → Endpoint → A → B → C → Response
# (C là outer nhất, A là inner nhất)

Tức là middleware add cuối cùng sẽ chạy đầu tiên khi request đi vào, và cuối cùng khi response đi ra. Nhớ điều này khi sắp xếp các middleware phụ thuộc vào nhau.

13. Middleware vs Dependencies — Khi nào dùng gì?

Hai công cụ có vẻ giống nhau nhưng khác biệt quan trọng:

  • Middleware: áp dụng cho MỌI request, kể cả static files, không biết endpoint nào sẽ được gọi. Nhận Request raw, không có access vào các param đã được parse. Thích hợp cho: logging, CORS, compression, request ID.
  • Dependencies: áp dụng có chọn lọc (per-endpoint hoặc per-router). Chạy sau khi FastAPI đã parse path/query/body. Có thể trả về giá trị cho endpoint dùng. Thích hợp cho: authentication, permission check, DB session, business logic chung.

Quy tắc đơn giản: nếu logic liên quan đến nội dung cụ thể của endpoint (user, permission, data), dùng dependency. Nếu là infrastructure chung cho mọi request, dùng middleware.

14. Background Tasks — Trả response ngay, làm việc sau

Đôi khi sau khi xử lý request, bạn cần làm việc gì đó tốn thời gian nhưng không nhất thiết client phải chờ: gửi email xác nhận, sinh report, gửi webhook, update search index…

Background tasks cho phép bạn trả response cho client ngay lập tức, và code background chạy sau đó.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def send_welcome_email(email: str, name: str):
    # Giả lập gửi email - mất 3 giây
    import time
    time.sleep(3)
    print(f"Đã gửi email chào mừng tới {email}")

def log_user_action(user_id: int, action: str):
    with open("audit.log", "a") as f:
        f.write(f"{user_id}: {action}\n")

@app.post("/users")
def create_user(
    email: str,
    name: str,
    background_tasks: BackgroundTasks,
):
    user = save_user(email, name)

    # Đăng ký task chạy SAU KHI response được gửi
    background_tasks.add_task(send_welcome_email, email, name)
    background_tasks.add_task(log_user_action, user["id"], "created")

    return {"id": user["id"], "email": email}
    # Client nhận response ngay, không phải chờ 3 giây cho email

15. Background tasks trong dependencies

Bạn có thể đăng ký background task từ bên trong dependency — rất tiện cho audit logging:

def audit_logger(
    request: Request,
    background_tasks: BackgroundTasks,
):
    def log_request():
        logger.info(
            f"Audit: {request.method} {request.url.path} "
            f"at {datetime.utcnow()}"
        )
    background_tasks.add_task(log_request)

@app.post("/tasks", dependencies=[Depends(audit_logger)])
def create_task(task: TaskCreate):
    # audit tự động được log sau response
    ...

16. Giới hạn của BackgroundTasks

Đây là phần cực kỳ quan trọng mà nhiều người bỏ qua:

  • Background tasks chạy trong cùng process với FastAPI app. Nếu task ngốn nhiều CPU/memory, nó làm chậm cả server. Nếu server restart, task đang chạy bị mất.
  • Dùng BackgroundTasks khi: task ngắn (vài giây), không quan trọng nếu thỉnh thoảng thất bại, không cần retry, ví dụ: gửi email thông báo, log audit, cleanup file tạm.
  • Đừng dùng khi: task dài (vài phút trở lên), cần retry khi thất bại, cần chạy định kỳ, task quan trọng không được mất. Lúc đó cần task queue thực sự.

17. Khi nào cần task queue (Celery, RQ, Dramatiq)

Với các task “nghiêm túc”, dùng task queue chuyên dụng:

# Với Celery
from celery import Celery

celery_app = Celery("taskapi", broker="redis://localhost:6379/0")

@celery_app.task(bind=True, max_retries=3)
def generate_report(self, task_id: int, user_email: str):
    try:
        # Xử lý tốn thời gian (vài phút)
        report = build_huge_report(task_id)
        send_email(user_email, report)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

# Trong FastAPI endpoint
@app.post("/tasks/{task_id}/report")
def request_report(task_id: int, user_email: str):
    # Enqueue vào Celery, chạy ở worker process riêng
    generate_report.delay(task_id, user_email)
    return {"message": "Report đang được tạo, sẽ gửi qua email khi xong"}

So sánh nhanh:

  • BackgroundTasks: built-in, đơn giản, không cần infrastructure thêm. Phù hợp task ngắn, không critical.
  • Celery/RQ/Dramatiq: cần Redis/RabbitMQ, phức tạp hơn. Phù hợp task dài, cần retry, cần monitoring, cần scaling worker độc lập với API.

Khoá học này tập trung vào BackgroundTasks. Khi dự án của bạn scale đến mức cần task queue, Celery là lựa chọn phổ biến nhất trong Python.

18. Lifespan Events — Startup và Shutdown

Ngoài per-request hooks, bạn có thể chạy code khi app khởi độngtắt:

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # === Startup ===
    print("App đang khởi động...")
    app.state.db_pool = await create_db_pool()
    app.state.redis = await connect_redis()
    print("Sẵn sàng nhận request")

    yield  # App chạy ở đây

    # === Shutdown ===
    print("App đang tắt...")
    await app.state.db_pool.close()
    await app.state.redis.close()
    print("Cleanup xong")


app = FastAPI(lifespan=lifespan)

Lifespan rất quan trọng cho:

Tạo connection pool (DB, Redis, HTTP client) một lần, dùng cho cả vòng đời app — không phải tạo mới mỗi request. Load model ML vào memory lúc start, không phải load lại mỗi request. Đóng connection và file handle khi shutdown để tránh data loss.

19. Ví dụ tổng hợp: Tất cả cùng nhau

Cập nhật TaskAPI với mọi thứ vừa học:

# app/main.py
import time
import uuid
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from app.routers import tasks
from app.config import settings

logger = logging.getLogger("taskapi")

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("TaskAPI khởi động")
    # app.state.db = await create_pool()
    yield
    logger.info("TaskAPI tắt")
    # await app.state.db.close()


app = FastAPI(
    title="TaskAPI",
    version="0.1.0",
    lifespan=lifespan,
)

# === Middlewares ===
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=1000)

@app.middleware("http")
async def request_context(request: Request, call_next):
    request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
    start = time.time()

    logger.info(f"[{request_id}] {request.method} {request.url.path}")

    try:
        response = await call_next(request)
    except Exception:
        logger.exception(f"[{request_id}] Unhandled exception")
        raise

    duration = time.time() - start
    response.headers["X-Request-ID"] = request_id
    response.headers["X-Process-Time"] = f"{duration:.4f}"
    logger.info(
        f"[{request_id}] {response.status_code} ({duration:.3f}s)"
    )
    return response


# === Routers ===
app.include_router(tasks.router, prefix="/api/v1")
# app/routers/tasks.py
from fastapi import APIRouter, BackgroundTasks, Depends, status
from app.models.task import TaskCreate, TaskResponse
from app.dependencies import get_current_user, pagination_params
from app.services.notifications import notify_task_created

router = APIRouter(prefix="/tasks", tags=["tasks"])

@router.post(
    "/",
    response_model=TaskResponse,
    status_code=status.HTTP_201_CREATED,
)
def create_task(
    task: TaskCreate,
    background_tasks: BackgroundTasks,
    user: dict = Depends(get_current_user),
):
    new_task = save_task(task, owner_id=user["id"])

    # Chạy background: gửi notification, update search index
    background_tasks.add_task(notify_task_created, new_task, user["email"])
    background_tasks.add_task(update_search_index, new_task)

    return new_task

@router.get("/")
def list_tasks(
    user: dict = Depends(get_current_user),
    pagination: dict = Depends(pagination_params),
):
    return get_user_tasks(user["id"], **pagination)

20. Bài tập

Mở rộng TaskAPI của bạn:

  • Viết dependency require_permission(permission: str) dùng pattern dependency-with-parameters (gợi ý: class-based hoặc closure).
  • Tạo middleware CorrelationIDMiddleware đọc X-Correlation-ID từ request, generate nếu chưa có, truyền vào response. Implement background task gửi “task due reminder” khi có task được tạo với due_date.
  • Chuyển hardcoded DB dict sang dùng dependency get_db() với yield pattern (chuẩn bị cho Bài 10).
  • Thêm middleware log ra file với format JSON (mỗi dòng là 1 JSON object, dễ parse bằng tools phân tích log).

Tổng kết

Đây là một trong những bài “nặng” nhất của khoá học, và bạn vừa nắm được các kỹ thuật quan trọng nhất của FastAPI:

  • Dependency Injection: công cụ mạnh nhất để tổ chức code — từ dependency đơn giản đến sub-dependencies, class-based, yield pattern cho resource management, và global dependencies.
  • Middleware: xử lý cross-cutting concerns cho mọi request — logging, CORS, compression, rate limiting. Hiểu rõ khi nào dùng middleware vs dependency.
  • Background Tasks: trả response nhanh cho client, xử lý việc tốn thời gian sau. Biết giới hạn và khi nào cần chuyển sang task queue thực sự.
  • Lifespan events: quản lý resource ở vòng đời app.

Những pattern này sẽ xuất hiện lặp đi lặp lại trong mọi project FastAPI của bạn. Hãy luyện tập cho đến khi chúng trở thành “phản xạ”.

Bài 8 tiếp theo, chúng ta sẽ tập trung vào một chủ đề rất thực tế: xử lý lỗi. Làm sao để custom exception handlers, thiết kế format lỗi thống nhất cho cả API, xử lý validation errors đẹp hơn, và các pattern xử lý lỗi trong production.

Leave a Reply

Discover more from Bệ Phóng Việt

Subscribe now to keep reading and get access to the full archive.

Continue reading