Chào mừng bạn đến với Bài 9 của khoá học Python FastAPI. Bài này về một chủ đề mà mọi API thực tế đều cần: xác thực (authentication) và ủy quyền (authorization).
Đây là chủ đề mà rất nhiều developer làm sai, và hậu quả có thể rất nặng: rò rỉ dữ liệu người dùng, tài khoản bị hack, hoặc tệ hơn — toàn bộ database bị chiếm đoạt.
Mình đã review nhiều codebase nơi developer “tự viết” authentication và chứng kiến đủ loại lỗi bảo mật cơ bản: lưu password dạng plain text, JWT không có expiration, secret key hardcoded trong code, không validate token đúng cách…
Trong bài này, chúng ta sẽ làm đúng ngay từ đầu. Bạn sẽ học cách implement authentication với JWT, password hashing an toàn, role-based authorization, và các best practices bảo mật. Tiếp tục nâng cấp TaskAPI để nó sẵn sàng cho người dùng thật. 🔐
1. Authentication vs Authorization — Nhắc lại
Ở Bài 4 chúng ta đã chạm vào hai khái niệm này. Nhắc lại nhanh vì đây là nền tảng của cả bài:
- Authentication (AuthN): “Bạn là ai?” — verify danh tính. Thực hiện qua username/password, token, OAuth… Thường làm MỘT LẦN khi user đăng nhập.
- Authorization (AuthZ): “Bạn được phép làm gì?” — kiểm tra quyền sau khi đã biết user là ai. Thực hiện ở MỌI request cần quyền.
Hai việc này luôn đi cùng nhau: authenticate trước (biết user là ai), authorize sau (kiểm tra quyền của user đó).
2. Các phương pháp authentication phổ biến
Trong REST API, có vài phương pháp phổ biến:
- Basic Auth: username:password encode base64, gửi trong header. Đơn giản nhưng kém an toàn — chỉ dùng cho admin panel nội bộ đi qua HTTPS.
- API Key: một chuỗi bí mật gửi trong header (thường
X-API-Key). Thích hợp cho machine-to-machine. Đơn giản, nhưng không có user context rõ ràng. - JWT (JSON Web Token): token có cấu trúc, tự chứa thông tin user, có chữ ký số để chống giả mạo. Phổ biến nhất hiện nay cho SPA/mobile app.
- OAuth 2.0: framework cho phép user đăng nhập qua Google/Facebook/GitHub… Phức tạp hơn, dùng khi cần SSO hoặc third-party login.
- Session-based: server lưu session trong Redis/DB, client giữ session ID trong cookie. Phù hợp cho web app truyền thống, ít dùng cho API-only.
Trong bài này chúng ta tập trung vào JWT — giải pháp cân bằng giữa đơn giản và mạnh mẽ, phù hợp nhất cho REST API.
3. Password Hashing — Việc đầu tiên phải làm đúng
Quy tắc số 1 của security: KHÔNG BAO GIỜ lưu password dạng plain text. Ngay cả mã hoá 2 chiều cũng không đủ — nếu database bị lộ, password vẫn bị lộ.
Luôn dùng password hashing — một chiều, không thể reverse. Hiện tại, bcrypt và argon2 là hai lựa chọn chuẩn. Chúng ta dùng bcrypt qua thư viện passlib:
pip install "passlib[bcrypt]" "python-jose[cryptography]" python-multipart
Ba package này: passlib cho hashing password, python-jose cho JWT, python-multipart cần cho form data (OAuth2 password flow).
# app/security.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""Hash password trước khi lưu DB"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Kiểm tra password khi login"""
return pwd_context.verify(plain_password, hashed_password)
Sử dụng:
# Khi đăng ký
password = "super_secret_123"
hashed = hash_password(password)
print(hashed)
# $2b$12$KIXxP.8dG...rất-dài...
# Khi đăng nhập
if verify_password("super_secret_123", hashed):
print("Password đúng")
else:
print("Password sai")
Điểm quan trọng: bcrypt tự động thêm “salt” (một chuỗi ngẫu nhiên) vào mỗi password trước khi hash. Nghĩa là cùng một password "abc123" hash hai lần sẽ ra hai kết quả khác nhau — nhưng verify_password vẫn hoạt động đúng. Salt ngăn tấn công rainbow table.
4. JWT — JSON Web Token
JWT là một token có cấu trúc, gồm 3 phần ngăn cách bằng dấu chấm: header.payload.signature.
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjMiLCJleHAiOjE3...MzI.v8YD...xP2Q
- Header: thuật toán ký, loại token.
- Payload: dữ liệu (user id, role, expiration…).
- Signature: chữ ký số để verify token không bị sửa.
Điểm quan trọng nhất cần hiểu: payload của JWT được encode bằng base64, KHÔNG phải mã hoá. Ai cũng có thể decode và đọc được nội dung. Chữ ký chỉ đảm bảo token không bị sửa đổi — không đảm bảo bí mật.
Vì vậy: KHÔNG BAO GIỜ cho password, API key, hay thông tin nhạy cảm vào JWT payload. Chỉ dùng cho thông tin “công khai” như user id, role, username.
Tạo và verify JWT
# app/security.py
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from app.config import settings
ALGORITHM = "HS256"
def create_access_token(
subject: str,
expires_delta: timedelta = None,
extra_claims: dict = None,
) -> str:
"""Tạo JWT access token"""
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.access_token_expire_minutes)
)
payload = {
"sub": str(subject), # subject - thường là user_id
"exp": expire, # expiration time
"iat": datetime.now(timezone.utc), # issued at
"type": "access", # token type
}
if extra_claims:
payload.update(extra_claims)
return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
"""Decode và verify JWT. Raise JWTError nếu token không hợp lệ."""
return jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
Config tương ứng:
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
import secrets
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env")
app_name: str = "TaskAPI"
# QUAN TRỌNG: secret_key phải dài, ngẫu nhiên, và BÍ MẬT
secret_key: str = secrets.token_urlsafe(32)
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
settings = Settings()
Cảnh báo bảo mật cực kỳ quan trọng: secret_key PHẢI được lưu trong environment variable, không bao giờ hardcode trong code hay commit vào git. Nếu secret key bị lộ, kẻ tấn công có thể tự tạo JWT giả mạo bất kỳ user nào.
Tạo secret key tốt trong terminal:
python -c "import secrets; print(secrets.token_urlsafe(32))"
5. User model và database
Bài này chúng ta dùng “database” giả trong memory (để không phụ thuộc vào Bài 10). Tạo user model:
# app/models/user.py
from datetime import datetime
from enum import Enum
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field
class UserRole(str, Enum):
user = "user"
admin = "admin"
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str = Field(..., min_length=8, max_length=72)
# Chú ý 72: giới hạn của bcrypt
class UserResponse(UserBase):
id: int
role: UserRole
is_active: bool
created_at: datetime
class UserInDB(UserResponse):
"""User với hashed password - CHỈ dùng nội bộ, không expose ra API"""
hashed_password: str
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenPayload(BaseModel):
sub: str # user_id
exp: int
type: str
Và database giả:
# app/services/user_service.py
from datetime import datetime, timezone
from typing import Optional
from app.models.user import UserCreate, UserInDB, UserRole
from app.security import hash_password, verify_password
from app.exceptions import ConflictError, UnauthorizedError
_users_db: dict[int, UserInDB] = {}
_next_id = 1
class UserService:
@staticmethod
def create_user(data: UserCreate) -> UserInDB:
global _next_id
# Check email và username đã tồn tại
if any(u.email == data.email for u in _users_db.values()):
raise ConflictError(detail="Email đã được đăng ký")
if any(u.username == data.username for u in _users_db.values()):
raise ConflictError(detail="Username đã tồn tại")
user = UserInDB(
id=_next_id,
email=data.email,
username=data.username,
full_name=data.full_name,
role=UserRole.user,
is_active=True,
hashed_password=hash_password(data.password),
created_at=datetime.now(timezone.utc),
)
_users_db[_next_id] = user
_next_id += 1
return user
@staticmethod
def get_by_id(user_id: int) -> Optional[UserInDB]:
return _users_db.get(user_id)
@staticmethod
def get_by_username(username: str) -> Optional[UserInDB]:
return next(
(u for u in _users_db.values() if u.username == username),
None,
)
@staticmethod
def authenticate(username: str, password: str) -> UserInDB:
"""Xác thực user với username + password"""
user = UserService.get_by_username(username)
if not user:
# Đừng để lộ rằng user không tồn tại
raise UnauthorizedError(detail="Username hoặc password không đúng")
if not verify_password(password, user.hashed_password):
raise UnauthorizedError(detail="Username hoặc password không đúng")
if not user.is_active:
raise UnauthorizedError(detail="Tài khoản đã bị vô hiệu hoá")
return user
Chú ý thông báo lỗi khi login sai: luôn dùng message chung chung “Username hoặc password không đúng”, không phân biệt “username không tồn tại” vs “password sai”. Nếu phân biệt, kẻ tấn công dùng được để liệt kê username hợp lệ trong hệ thống.
6. Login endpoint với OAuth2 Password Flow
FastAPI có sẵn support cho OAuth2 — chúng ta dùng OAuth2 Password Flow (đơn giản nhất, phù hợp khi bạn tự quản lý user trong DB của mình):
# app/routers/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, status
from fastapi.security import OAuth2PasswordRequestForm
from app.models.user import Token, UserCreate, UserResponse
from app.services.user_service import UserService
from app.security import create_access_token
from app.config import settings
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post(
"/register",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
)
def register(data: UserCreate):
user = UserService.create_user(data)
return user
@router.post("/login", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""
Login với username + password.
Response: access_token + refresh_token.
"""
user = UserService.authenticate(form_data.username, form_data.password)
access_token = create_access_token(
subject=user.id,
expires_delta=timedelta(minutes=settings.access_token_expire_minutes),
extra_claims={"role": user.role.value},
)
refresh_token = create_access_token(
subject=user.id,
expires_delta=timedelta(days=settings.refresh_token_expire_days),
extra_claims={"type": "refresh"},
)
return Token(
access_token=access_token,
refresh_token=refresh_token,
)
OAuth2PasswordRequestForm là dependency built-in của FastAPI, tự động nhận username và password từ form data (không phải JSON). Đây là chuẩn OAuth2, để client và Swagger UI tương thích tốt.
7. Access token vs Refresh token
Bạn có thể tự hỏi: tại sao cần 2 token? Đây là pattern bảo mật quan trọng:
- Access token: sống ngắn (15-30 phút), dùng để gọi API. Nếu bị lộ, kẻ tấn công chỉ có thời gian ngắn để lợi dụng.
- Refresh token: sống lâu (7-30 ngày), chỉ dùng để xin access token mới. Không dùng để gọi API thông thường. Nếu phát hiện bị lộ, có thể “revoke” để đá user ra.
Flow điển hình: user login → nhận cả 2 token → dùng access token để gọi API → khi access token hết hạn → dùng refresh token xin access token mới → không cần login lại.
# app/routers/auth.py
from pydantic import BaseModel
from jose import JWTError
from app.security import decode_token
class RefreshRequest(BaseModel):
refresh_token: str
@router.post("/refresh", response_model=Token)
def refresh_access_token(data: RefreshRequest):
try:
payload = decode_token(data.refresh_token)
except JWTError:
raise UnauthorizedError(detail="Refresh token không hợp lệ")
if payload.get("type") != "refresh":
raise UnauthorizedError(detail="Token không phải refresh token")
user_id = int(payload["sub"])
user = UserService.get_by_id(user_id)
if not user or not user.is_active:
raise UnauthorizedError(detail="User không tồn tại hoặc đã bị vô hiệu hoá")
# Tạo access token mới
access_token = create_access_token(
subject=user.id,
extra_claims={"role": user.role.value},
)
# Có thể tạo luôn refresh token mới (token rotation - an toàn hơn)
new_refresh_token = create_access_token(
subject=user.id,
expires_delta=timedelta(days=settings.refresh_token_expire_days),
extra_claims={"type": "refresh"},
)
return Token(access_token=access_token, refresh_token=new_refresh_token)
8. Dependency xác thực current user
Giờ đến phần quan trọng nhất: dependency để lấy user hiện tại từ JWT. Đây là “cổng” mà mọi endpoint cần auth sẽ đi qua:
# app/dependencies.py
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from app.security import decode_token
from app.services.user_service import UserService
from app.models.user import UserInDB
from app.exceptions import UnauthorizedError, ForbiddenError
# tokenUrl chỉ cho Swagger UI biết endpoint login ở đâu
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def get_current_user(token: str = Depends(oauth2_scheme)) -> UserInDB:
"""
Extract và verify JWT, return user.
Dùng cho mọi endpoint cần authenticated user.
"""
try:
payload = decode_token(token)
except JWTError:
raise UnauthorizedError(detail="Token không hợp lệ hoặc đã hết hạn")
if payload.get("type") != "access":
raise UnauthorizedError(detail="Token type không hợp lệ")
user_id_str = payload.get("sub")
if not user_id_str:
raise UnauthorizedError(detail="Token không chứa thông tin user")
user = UserService.get_by_id(int(user_id_str))
if not user:
raise UnauthorizedError(detail="User không tồn tại")
if not user.is_active:
raise UnauthorizedError(detail="Tài khoản đã bị vô hiệu hoá")
return user
def get_current_active_user(
user: UserInDB = Depends(get_current_user),
) -> UserInDB:
"""Alias cho trường hợp muốn rõ ràng user phải active"""
return user
Sử dụng trong endpoint:
# app/routers/users.py
from fastapi import APIRouter, Depends
from app.models.user import UserResponse, UserInDB
from app.dependencies import get_current_user
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/me", response_model=UserResponse)
def get_me(current_user: UserInDB = Depends(get_current_user)):
"""Lấy thông tin user đang đăng nhập"""
return current_user
Khi client gọi GET /api/v1/users/me mà không có header Authorization: Bearer ..., FastAPI tự động trả 401. Khi có token nhưng hết hạn, trả 401 với message từ exception. Code endpoint cực kỳ sạch — toàn bộ logic auth ở dependency.
9. Role-based Access Control (RBAC)
Có user thôi chưa đủ — cần phân quyền. Pattern đơn giản nhất: mỗi user có một role, và endpoint yêu cầu role cụ thể.
# app/dependencies.py
from app.models.user import UserRole
def require_role(*allowed_roles: UserRole):
"""
Factory tạo dependency kiểm tra role.
Cách dùng: Depends(require_role(UserRole.admin))
"""
def dependency(user: UserInDB = Depends(get_current_user)) -> UserInDB:
if user.role not in allowed_roles:
raise ForbiddenError(
detail=f"Yêu cầu role: {', '.join(r.value for r in allowed_roles)}"
)
return user
return dependency
# Các shortcut thường dùng
require_admin = require_role(UserRole.admin)
Sử dụng:
from app.dependencies import require_admin, require_role
@router.delete("/users/{user_id}")
def delete_user(
user_id: int,
admin: UserInDB = Depends(require_admin),
):
# Chỉ admin mới vào được đây
...
# Hoặc cho phép nhiều role
@router.get("/reports")
def get_reports(
user: UserInDB = Depends(require_role(UserRole.admin, UserRole.manager)),
):
...
10. Resource-based Authorization
RBAC chưa đủ cho mọi trường hợp. Ví dụ: user không được xoá task của user khác — nhưng cả hai đều có role user. Đây là resource-based authorization, cần check trên từng resource cụ thể:
# app/dependencies.py
from app.services.task_service import TaskService
from app.exceptions import TaskNotFoundError, ForbiddenError
def get_task_with_permission(
task_id: int,
user: UserInDB = Depends(get_current_user),
) -> dict:
"""
Lấy task và kiểm tra user có quyền truy cập.
Rules:
- Admin: truy cập mọi task.
- User: chỉ task của mình (owner_id == user.id).
"""
task = TaskService.get_task(task_id) # raise TaskNotFoundError nếu không có
# Admin bypass
if user.role == UserRole.admin:
return task
# User thường: phải là owner
if task["owner_id"] != user.id:
# Quan trọng: trả 404 thay vì 403 để không leak sự tồn tại của task
raise TaskNotFoundError(task_id)
return task
Chú ý một best practice: khi user không có quyền xem resource, nhiều API trả 404 thay vì 403. Lý do: 403 confirm rằng resource tồn tại — kẻ tấn công có thể enumerate ID để phát hiện resources của user khác. 404 không cho họ biết gì cả.
Sử dụng dependency này trong router:
@router.get("/tasks/{task_id}", response_model=TaskResponse)
def get_task(task: dict = Depends(get_task_with_permission)):
return task
@router.delete("/tasks/{task_id}", status_code=204)
def delete_task(
task: dict = Depends(get_task_with_permission),
user: UserInDB = Depends(get_current_user),
):
# Đã kiểm tra permission trong dependency
TaskService.delete(task["id"])
return None
11. Scope-based Authorization (cho API phức tạp)
Với API phức tạp hơn (đặc biệt là public API có nhiều quyền chi tiết), RBAC không đủ. Dùng scopes — danh sách permission nhỏ gán cho token:
from fastapi import Security
from fastapi.security import SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/api/v1/auth/login",
scopes={
"tasks:read": "Đọc tasks",
"tasks:write": "Tạo/sửa/xoá tasks",
"users:read": "Đọc thông tin users",
"admin": "Quyền admin",
},
)
def get_current_user_with_scopes(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
) -> UserInDB:
try:
payload = decode_token(token)
except JWTError:
raise UnauthorizedError(detail="Token không hợp lệ")
token_scopes = payload.get("scopes", [])
user_id = payload.get("sub")
# Check user tồn tại
user = UserService.get_by_id(int(user_id))
if not user:
raise UnauthorizedError(detail="User không tồn tại")
# Check scopes yêu cầu có trong token không
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise ForbiddenError(
detail=f"Thiếu scope yêu cầu: {scope}",
)
return user
# Sử dụng
@router.get("/tasks")
def list_tasks(
user: UserInDB = Security(
get_current_user_with_scopes,
scopes=["tasks:read"],
),
):
...
Scope linh hoạt hơn role — một user có thể có nhiều scope. Token cấp cho mobile app có thể chỉ có scope tasks:read trong khi token cấp cho admin dashboard có đầy đủ scopes.
12. Bảo mật password — Rules
Đã hash password là chưa đủ. Một số best practices nữa:
- Độ dài tối thiểu: ít nhất 8 ký tự. Lý tưởng là 12+. Đừng bắt user phải có “1 chữ hoa, 1 số, 1 ký tự đặc biệt” — nghiên cứu cho thấy điều này thường làm user chọn password dễ đoán hơn (“Password1!”). Thay vào đó, khuyến khích passphrase dài.
- Bcrypt giới hạn 72 bytes: nếu user nhập password dài hơn, chỉ 72 bytes đầu được hash. Nên enforce max 72 ký tự ở layer validation.
- Kiểm tra password bị leak: dùng API HIBP để check password có trong database rò rỉ không.
- Rate limiting endpoint login: tránh brute force. Giới hạn khoảng 5-10 lần/phút/IP.
from pydantic import BaseModel, Field, field_validator
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50)
password: str = Field(..., min_length=8, max_length=72)
@field_validator("password")
@classmethod
def password_strength(cls, v: str) -> str:
if v.lower() in {"password", "12345678", "qwerty", "abc12345"}:
raise ValueError("Password quá yếu")
if len(set(v)) < 4:
raise ValueError("Password phải có ít nhất 4 ký tự khác nhau")
return v
13. Đổi password
Endpoint đổi password có vài rule bảo mật quan trọng:
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str = Field(..., min_length=8, max_length=72)
@router.post("/me/change-password")
def change_password(
data: ChangePasswordRequest,
user: UserInDB = Depends(get_current_user),
):
# QUAN TRỌNG: verify current password trước khi đổi
# Nếu access token bị đánh cắp, kẻ tấn công không đổi được password
# mà không biết current password
if not verify_password(data.current_password, user.hashed_password):
raise UnauthorizedError(detail="Password hiện tại không đúng")
if data.current_password == data.new_password:
raise BusinessRuleError(detail="Password mới phải khác password cũ")
# Update hashed password
user.hashed_password = hash_password(data.new_password)
UserService.update(user)
# Tốt nhất: revoke mọi access token và refresh token hiện tại
# (cần token blacklist hoặc token versioning - xem phần 14)
return {"message": "Đổi password thành công"}
14. Token Revocation — Problem và Solution
JWT có một nhược điểm lớn: stateless. Một khi token đã được tạo, bạn không thể “huỷ” nó — nó vẫn hợp lệ cho đến khi hết hạn.
Vấn đề: user đổi password, user bị ban, token bị đánh cắp… bạn không thể invalidate token ngay.
Có vài cách giải quyết:
Cách 1: Access token hết hạn rất ngắn
Đặt access token chỉ sống 5-15 phút. Khi revoke, chỉ cần blacklist refresh token — sau khi access token hết hạn, user bị đá ra.
Cách 2: Token blacklist trong Redis
import redis
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
def revoke_token(token_jti: str, expire_seconds: int):
"""Thêm token vào blacklist"""
redis_client.setex(f"blacklist:{token_jti}", expire_seconds, "1")
def is_token_revoked(token_jti: str) -> bool:
return redis_client.exists(f"blacklist:{token_jti}") > 0
# Thêm claim "jti" (JWT ID) khi tạo token
import uuid
def create_access_token(subject, ...):
payload = {
"sub": str(subject),
"jti": str(uuid.uuid4()), # unique ID cho mỗi token
"exp": expire,
...
}
return jwt.encode(payload, ...)
# Check blacklist trong get_current_user
def get_current_user(token: str = Depends(oauth2_scheme)):
payload = decode_token(token)
if is_token_revoked(payload["jti"]):
raise UnauthorizedError(detail="Token đã bị thu hồi")
...
Cách 3: Token versioning
Mỗi user có một field token_version. JWT chứa version này. Khi muốn revoke tất cả token của user (vd: đổi password), tăng version lên. Mọi token cũ tự động invalid.
Cách này đơn giản hơn blacklist (không cần Redis), thích hợp cho “logout everywhere” hoặc “force re-login”.
15. HTTPS là bắt buộc
Mình đã nói ở Bài 4 nhưng nhấn mạnh lại: mọi API có authentication PHẢI dùng HTTPS. Không có ngoại lệ.
- Lý do: với HTTP, token đi qua mạng dạng plain text. Ai sniff được packet là có token — và có thể impersonate user. HTTPS mã hoá toàn bộ request/response, ngăn điều này.
Trong production, thường dùng reverse proxy (nginx, Caddy, Cloudflare) để terminate HTTPS trước khi forward đến FastAPI. Chúng ta sẽ nói kỹ về deploy ở Bài 13.
16. Security Headers
Thêm các security headers cho mọi response:
@app.middleware("http")
async def security_headers(request, call_next):
response = await call_next(request)
# Ngăn XSS và clickjacking
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
# HTTPS strict (chỉ bật khi thực sự dùng HTTPS)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Chặn browser gửi Referer header chứa URL nhạy cảm
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
17. Rate Limiting cho endpoints nhạy cảm
Login, register, password reset — các endpoint này cần rate limit nghiêm ngặt để chống brute force:
pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@router.post("/login")
@limiter.limit("5/minute")
def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
...
@router.post("/register")
@limiter.limit("3/hour")
def register(request: Request, data: UserCreate):
...
18. Ví dụ tổng hợp: TaskAPI với Authentication
Cập nhật main.py:
# app/main.py
from fastapi import FastAPI
from app.error_handlers import register_exception_handlers
from app.routers import auth, users, tasks
app = FastAPI(title="TaskAPI", version="0.2.0")
register_exception_handlers(app)
app.include_router(auth.router, prefix="/api/v1")
app.include_router(users.router, prefix="/api/v1")
app.include_router(tasks.router, prefix="/api/v1")
Task router với authorization đầy đủ:
# app/routers/tasks.py
from fastapi import APIRouter, Depends, status
from app.models.task import TaskCreate, TaskResponse
from app.models.user import UserInDB
from app.dependencies import (
get_current_user,
get_task_with_permission,
require_admin,
)
from app.services.task_service import TaskService
router = APIRouter(prefix="/tasks", tags=["tasks"])
@router.get("/", response_model=list[TaskResponse])
def list_tasks(user: UserInDB = Depends(get_current_user)):
"""User chỉ thấy task của mình, admin thấy tất cả"""
if user.role == "admin":
return TaskService.get_all()
return TaskService.get_by_owner(user.id)
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(
data: TaskCreate,
user: UserInDB = Depends(get_current_user),
):
return TaskService.create(data, owner_id=user.id)
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(task: dict = Depends(get_task_with_permission)):
return task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(
task: dict = Depends(get_task_with_permission),
):
TaskService.delete(task["id"])
@router.delete("/admin/purge-completed", status_code=status.HTTP_204_NO_CONTENT)
def purge_completed_tasks(admin: UserInDB = Depends(require_admin)):
"""Chỉ admin: xoá mọi task đã hoàn thành"""
TaskService.delete_all_completed()
Test workflow hoàn chỉnh trên Swagger UI:
Truy cập /docs. Gọi POST /api/v1/auth/register để tạo user. Gọi POST /api/v1/auth/login — copy access_token. Click nút “Authorize” ở góc trên phải, paste token. Giờ mọi endpoint có biểu tượng khoá đều gọi được. Gọi các endpoint của tasks — tự động được authenticated và authorized.
19. Checklist bảo mật authentication
Trước khi deploy, đảm bảo:
✅ Password được hash bằng bcrypt/argon2, không bao giờ lưu plain text.
✅ Secret key đủ dài (32+ bytes), lưu trong environment variable, không commit vào git.
✅ Access token sống ngắn (15-30 phút), refresh token dài hơn (7-30 ngày).
✅ JWT payload chỉ chứa thông tin không nhạy cảm.
✅ Login endpoint có rate limiting.
✅ Login error message chung chung, không reveal user tồn tại hay không.
✅ HTTPS bắt buộc trên production.
✅ Security headers (HSTS, X-Content-Type-Options, X-Frame-Options).
✅ Resource-based authorization trả 404 thay vì 403 khi không có quyền (trong hầu hết case).
✅ Có cơ chế revoke token (blacklist hoặc versioning) cho các trường hợp nhạy cảm.
✅ Change password endpoint yêu cầu current password.
✅ Không bao giờ trả về hashed_password ra response API (dùng model UserResponse không có field này).
20. Bài tập
Mở rộng TaskAPI:
- Thêm endpoint
POST /auth/forgot-password: nhận email, gửi token reset qua email (dùng BackgroundTasks từ Bài 7). - Token reset có format riêng (
type: "password_reset"), sống 15 phút. - Viết
POST /auth/reset-passwordnhận token và password mới. Implement token versioning — mỗi user cótoken_version, tăng khi đổi password để invalidate mọi token cũ. - Thêm RBAC với 3 role:
user,manager,admin. - Manager có thể xem task của user trong team mình (thêm field
team_idvào User và Task). - Test case: user A không được GET task của user B, cả khi A đoán được task_id. Response phải là 404.
- Viết
POST /auth/logoutthêm token vào Redis blacklist. Updateget_current_userđể check blacklist.
Tổng kết
Trong bài này, bạn đã học cách implement authentication và authorization đầy đủ cho API:
- Password hashing an toàn với bcrypt — không bao giờ plain text. JWT với access token và refresh token pattern. Dependency
get_current_userxử lý toàn bộ logic auth, endpoint cực kỳ sạch. Role-based access control (RBAC) với factory patternrequire_role(). - Resource-based authorization cho các trường hợp phức tạp.
- Scope-based authorization cho API có nhiều quyền chi tiết.
- Token revocation bằng blacklist hoặc versioning.
- Security best practices: HTTPS, rate limiting, security headers, password rules.
- Đây là foundation bảo mật của API. Với những gì vừa học, TaskAPI của bạn đã đủ an toàn để deploy và phục vụ người dùng thật.
Ở Bài 10 tiếp theo, chúng ta sẽ thay thế “database giả” bằng database thật. Bạn sẽ học cách tích hợp SQLAlchemy với FastAPI, thiết kế schema, migration với Alembic, và các pattern làm việc với database trong môi trường async.
