minio_client_provider.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from typing import Dict, Tuple
  2. import io
  3. from minio import Minio
  4. from cachetools import TTLCache
  5. from app.services.bucket_config_service import bucket_config_service
  6. from app.core.config import settings
  7. # 缓存已初始化的 MinIO 客户端,key 为 f"{user_id}/{owner_code}"
  8. minio_client_cache: TTLCache = TTLCache(maxsize=100, ttl=3600)
  9. class MinioClientProvider:
  10. """
  11. 提供基于上下文的 MinIO 客户端获取能力,统一封装缓存与配置解析。
  12. 依赖 bucket_config_service 从 ContextVar 中读取 RequestContext。
  13. """
  14. def get_bucket_config_and_client(self) -> Tuple[Dict, Minio, str]:
  15. """
  16. 获取桶配置和 MinIO 客户端(带缓存),无需传参,直接使用上下文。
  17. 返回 (bucket_config, client, bucket_name)
  18. """
  19. # 解析上下文与配置
  20. current_ctx, user, owner_code = bucket_config_service.resolve_context()
  21. bucket_config = bucket_config_service.get_bucket_config(
  22. user=user,
  23. owner_code=owner_code,
  24. ctx=current_ctx,
  25. )
  26. cache_key = f"{user.id}/{owner_code}"
  27. # 缓存命中直接返回
  28. if cache_key in minio_client_cache:
  29. return bucket_config, minio_client_cache[cache_key], bucket_config["minioBucketName"]
  30. # 创建新的客户端
  31. secure = getattr(settings, "MINIO_SECURE", False)
  32. client = Minio(
  33. settings.MINIO_ENDPOINT,
  34. access_key=bucket_config["minioAccessKey"],
  35. secret_key=bucket_config["minioSecretKey"],
  36. secure=secure,
  37. )
  38. # 确保桶存在
  39. bucket_name = bucket_config["minioBucketName"]
  40. if not client.bucket_exists(bucket_name):
  41. client.make_bucket(bucket_name)
  42. # 写入缓存
  43. minio_client_cache[cache_key] = client
  44. return bucket_config, client, bucket_name
  45. # 基础 MinIO 操作封装,自动从上下文解析桶配置与客户端
  46. def list_objects(self, prefix: str = "", recursive: bool = False):
  47. _, client, bucket_name = self.get_bucket_config_and_client()
  48. return client.list_objects(bucket_name, prefix=prefix, recursive=recursive)
  49. def put_object(self, path: str, data, length: int, content_type: str):
  50. _, client, bucket_name = self.get_bucket_config_and_client()
  51. client.put_object(bucket_name, path, data, length=length, content_type=content_type)
  52. def put_empty_object(self, path: str):
  53. _, client, bucket_name = self.get_bucket_config_and_client()
  54. client.put_object(bucket_name, path, io.BytesIO(b""), 0)
  55. def get_object(self, path: str):
  56. _, client, bucket_name = self.get_bucket_config_and_client()
  57. return client.get_object(bucket_name, path)
  58. def remove_object(self, path: str):
  59. _, client, bucket_name = self.get_bucket_config_and_client()
  60. client.remove_object(bucket_name, path)
  61. def remove_objects_with_prefix(self, prefix: str):
  62. _, client, bucket_name = self.get_bucket_config_and_client()
  63. objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)
  64. for obj in objects:
  65. client.remove_object(bucket_name, obj.object_name)
  66. def get_presigned_url(self, path: str, expires):
  67. _, client, bucket_name = self.get_bucket_config_and_client()
  68. return client.get_presigned_url("GET", bucket_name, path, expires=expires)
  69. def get_storage_info(self) -> Dict:
  70. bucket_config, client, bucket_name = self.get_bucket_config_and_client()
  71. total_size = 0
  72. for obj in client.list_objects(bucket_name, recursive=True):
  73. total_size += obj.size
  74. quota_gb = bucket_config.get("quotaGb", 10)
  75. total_capacity = quota_gb * 1024 * 1024 * 1024
  76. used_percentage = (total_size / total_capacity) * 100 if total_capacity > 0 else 0
  77. return {
  78. "used_size": total_size,
  79. "total_capacity": total_capacity,
  80. "used_percentage": round(used_percentage, 1),
  81. "available_size": total_capacity - total_size,
  82. }
  83. # 单例
  84. minio_client_provider = MinioClientProvider()