| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- import json
- from fastapi import Header
- from typing import Optional
- from app.core.security import ServerEnum
- from app.utils.jwt_utils import verify_jwt_token
- from app.models.user import User
- from app.core.exceptions import (
- UnauthorizedException,
- ValidationException,
- StatusCode,
- ErrorMessage
- )
- def USER_KEY(user_id: str, server: str) -> str:
- """生成用户缓存key"""
- return f"user:{user_id}:{server}"
- class AuthService:
- """认证服务"""
- @staticmethod
- async def verify_and_get_user(
- authorization: Optional[str] = Header(None),
- source: Optional[str] = Header(None)
- ) -> User:
- """验证用户token并返回用户信息"""
- # 检查请求来源
- if not source or not ServerEnum.exist(source):
- raise ValidationException(detail=ErrorMessage.INVALID_HEADER)
- # 检查token是否存在
- if not authorization:
- raise UnauthorizedException(detail=ErrorMessage.UNAUTHORIZED)
-
- token = authorization.replace("Bearer ", "")
- # 验证JWT token
- claims = verify_jwt_token(token)
- if not claims:
- raise UnauthorizedException(
- detail=ErrorMessage.INVALID_TOKEN,
- code=StatusCode.INVALID_TOKEN
- )
- user_id = claims.get("sub")
- if not user_id:
- raise UnauthorizedException(
- detail=ErrorMessage.INVALID_TOKEN,
- code=StatusCode.INVALID_TOKEN
- )
- return User(id=user_id, token=token)
- # 创建全局实例
- auth_service = AuthService()
|