"""请求日志中间件。 记录 FastAPI 每个请求的路径、状态码、耗时和响应摘要,支持日志轮转。 """ import json import logging import os import time from datetime import datetime from logging.handlers import RotatingFileHandler from fastapi import FastAPI, Request from starlette.concurrency import iterate_in_threadpool def setup_request_logging(app: FastAPI, max_response_len: int, max_bytes: int, backup_count: int) -> None: """为应用安装请求日志中间件。""" logger = logging.getLogger("cargo_height.request") _setup_request_logger(logger, max_bytes=max_bytes, backup_count=backup_count) @app.middleware("http") async def request_log_middleware(request: Request, call_next): # 记录请求开始时间,用于后续耗时统计。 request_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] start = time.perf_counter() try: response = await call_next(request) except Exception: # 异常请求按 500 记录并保留堆栈。 elapsed_ms = (time.perf_counter() - start) * 1000 logger.exception( "request_time=%s method=%s path=%s status=%s duration_ms=%.2f response=%s", request_time, request.method, request.url.path, 500, elapsed_ms, "internal_error", ) raise # 读取响应体用于日志输出,然后恢复迭代器,避免影响客户端接收数据。 body = b"" async for chunk in response.body_iterator: body += chunk response.body_iterator = iterate_in_threadpool(iter([body])) response_text = _parse_response_text(body, response.headers.get("content-type", "")) if len(response_text) > max_response_len: response_text = response_text[:max_response_len] + "...(truncated)" elapsed_ms = (time.perf_counter() - start) * 1000 logger.info( "request_time=%s method=%s path=%s status=%s duration_ms=%.2f response=%s", request_time, request.method, request.url.path, response.status_code, elapsed_ms, response_text, ) return response def _setup_request_logger(logger: logging.Logger, max_bytes: int, backup_count: int) -> None: """初始化请求日志记录器(文件 + 控制台)。""" log_dir = os.path.join(os.getcwd(), "Log") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, "request.log") logger.setLevel(logging.INFO) logger.propagate = False if logger.handlers: return formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s - %(message)s", "%Y-%m-%d %H:%M:%S", ) file_handler = RotatingFileHandler( log_file, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8", ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.INFO) stream_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(stream_handler) def _parse_response_text(body: bytes, content_type: str) -> str: """根据响应类型提取可读日志文本。""" if not body: return "" if "application/json" in content_type: try: # 统一 JSON 格式,便于检索。 return json.dumps(json.loads(body), ensure_ascii=False) except Exception: return body.decode("utf-8", errors="replace") return body.decode("utf-8", errors="replace")