auth_utils.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import json
  2. from fastapi import Header
  3. from typing import Optional
  4. from app.core.security import ServerEnum
  5. from app.utils.jwt_utils import verify_jwt_token
  6. from app.models.user import User
  7. from app.core.exceptions import (
  8. UnauthorizedException,
  9. ValidationException,
  10. StatusCode,
  11. ErrorMessage
  12. )
  13. def USER_KEY(user_id: str, server: str) -> str:
  14. """生成用户缓存key"""
  15. return f"user:{user_id}:{server}"
  16. class AuthService:
  17. """认证服务"""
  18. @staticmethod
  19. async def verify_and_get_user(
  20. authorization: Optional[str] = Header(None),
  21. source: Optional[str] = Header(None)
  22. ) -> User:
  23. """验证用户token并返回用户信息"""
  24. # 检查请求来源
  25. if not source or not ServerEnum.exist(source):
  26. raise ValidationException(detail=ErrorMessage.INVALID_HEADER)
  27. # 检查token是否存在
  28. if not authorization:
  29. raise UnauthorizedException(detail=ErrorMessage.UNAUTHORIZED)
  30. token = authorization.replace("Bearer ", "")
  31. # 验证JWT token
  32. claims = verify_jwt_token(token)
  33. if not claims:
  34. raise UnauthorizedException(
  35. detail=ErrorMessage.INVALID_TOKEN,
  36. code=StatusCode.INVALID_TOKEN
  37. )
  38. user_id = claims.get("sub")
  39. if not user_id:
  40. raise UnauthorizedException(
  41. detail=ErrorMessage.INVALID_TOKEN,
  42. code=StatusCode.INVALID_TOKEN
  43. )
  44. return User(id=user_id, token=token)
  45. # 创建全局实例
  46. auth_service = AuthService()