import json from fastapi import Header from typing import Optional from app.core.security import ServerEnum from app.utils.jwt_utils import verify_jwt_token from app.models.user import User from app.core.exceptions import ( UnauthorizedException, ValidationException, StatusCode, ErrorMessage ) def USER_KEY(user_id: str, server: str) -> str: """生成用户缓存key""" return f"user:{user_id}:{server}" class AuthService: """认证服务""" @staticmethod async def verify_and_get_user( authorization: Optional[str] = Header(None), source: Optional[str] = Header(None) ) -> User: """验证用户token并返回用户信息""" # 检查请求来源 if not source or not ServerEnum.exist(source): raise ValidationException(detail=ErrorMessage.INVALID_HEADER) # 检查token是否存在 if not authorization: raise UnauthorizedException(detail=ErrorMessage.UNAUTHORIZED) token = authorization.replace("Bearer ", "") # 验证JWT token claims = verify_jwt_token(token) if not claims: raise UnauthorizedException( detail=ErrorMessage.INVALID_TOKEN, code=StatusCode.INVALID_TOKEN ) user_id = claims.get("sub") if not user_id: raise UnauthorizedException( detail=ErrorMessage.INVALID_TOKEN, code=StatusCode.INVALID_TOKEN ) return User(id=user_id, token=token) # 创建全局实例 auth_service = AuthService()