import requests import time from typing import Optional from app.core.config import settings from app.core.exceptions import ( UnauthorizedException, ForbiddenException, BusinessException, InternalServerException, ErrorMessage, StatusCode ) class request_service: def __init__(self): """初始化请求服务""" self.base_url = settings.JAVA_API_BASE_URL self.session = requests.Session() self.session.timeout = 80 # 80秒超时 self.max_retries = 3 # 最大重试次数 self.retry_delay = 1 # 重试延迟(秒) def request(self, method: str, url: str, auth: bool, authorization: Optional[str] = None, source: Optional[str] = None, version: Optional[str] = None, **kwargs) -> any: """ 发送请求的核心方法 参数: method: HTTP方法 (GET/POST等) url: 请求路径 auth: 是否需要认证 authorization: Authorization token source: Source 头部,默认为 'customer' version: Version 头部,默认为 'V6' **kwargs: 其他请求参数 返回: 响应数据或文件对象 异常: 抛出请求或业务逻辑相关的异常 """ # 1. 准备请求头 headers = self._prepare_headers(auth, authorization, source, version) # 2. 处理GET请求的数组参数 if method.upper() == 'GET' and 'params' in kwargs: kwargs['params'] = self._process_array_params(kwargs['params']) # 3. 设置请求头 kwargs['headers'] = headers # 4. 发送请求(带重试机制) response = self._send_request_with_retry(method, url, **kwargs) # 5. 处理响应 return self._handle_response(response) def _prepare_headers(self, auth: bool, authorization: Optional[str] = None, source: Optional[str] = None, version: Optional[str] = None) -> 'dict[str, str]': """准备请求头""" headers = { 'Source': source or 'customer', 'Content-Type': 'application/json', 'Version': version or 'V6' } if auth and authorization: headers['Authorization'] = authorization return headers def _process_array_params(self, params: 'dict[str, any]') -> 'dict[str, any]': """处理GET请求中的数组参数""" processed = {} for key, value in params.items(): if isinstance(value, list): processed[key] = ','.join(map(str, value)) else: processed[key] = value return processed def _handle_response(self, response: requests.Response) -> any: """处理响应""" # 处理文件类型的响应 if response.headers.get('requesttype') == 'file': return response try: res = response.json() except ValueError: raise InternalServerException(detail=ErrorMessage.INVALID_RESPONSE) if not res: return None # 处理业务逻辑错误 if res.get('code') != 200: self._handle_business_error(res) return res.get('data') def _handle_business_error(self, response_data: 'dict[str, any]'): """处理业务逻辑错误,直接使用Java服务返回的code和message""" error_code = response_data.get('code') error_msg = response_data.get('message', '未知系统错误') # 可接受的错误码,直接返回(不抛出异常) if error_code in (-1, 0): return # 根据错误码映射HTTP状态码,但保留原始的code和message # 600, 601 -> 401 (未授权) if error_code in (600, 601): raise UnauthorizedException( detail=error_msg, code=error_code ) # 403或权限相关 -> 403 (禁止访问) elif error_code == 403 or "无该货主权限" in error_msg or "无权限" in error_msg or "没有权限" in error_msg: raise ForbiddenException( detail=error_msg, code=error_code if error_code else StatusCode.FORBIDDEN ) else: # 其他所有错误(包括1001等),直接使用原始code和message # HTTP状态码使用400,但响应体中的code保留原始值 raise BusinessException( detail=error_msg, status_code=400, code=error_code ) def _send_request_with_retry(self, method: str, url: str, **kwargs) -> requests.Response: """带重试机制的请求发送,打印完整请求数据""" last_exception = None full_url = f"{self.base_url}{url}" for attempt in range(self.max_retries): try: # 获取headers并确保它是字典 headers = kwargs.get('headers', {}) if headers is None: headers = {} safe_headers = dict(headers) # 更安全的方式创建副本 if 'Authorization' in safe_headers: safe_headers['Authorization'] = 'Bearer ******' # 发送请求 response = self.session.request(method, full_url, **kwargs) return response except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e: last_exception = e if attempt < self.max_retries - 1: time.sleep(self.retry_delay) raise InternalServerException( detail=str(last_exception) if last_exception else ErrorMessage.REQUEST_FAILED )