| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from typing import Dict, Tuple
- import io
- from minio import Minio
- from cachetools import TTLCache
- from app.services.bucket_config_service import bucket_config_service
- from app.core.config import settings
- # 缓存已初始化的 MinIO 客户端,key 为 f"{user_id}/{owner_code}"
- minio_client_cache: TTLCache = TTLCache(maxsize=100, ttl=3600)
- class MinioClientProvider:
- """
- 提供基于上下文的 MinIO 客户端获取能力,统一封装缓存与配置解析。
- 依赖 bucket_config_service 从 ContextVar 中读取 RequestContext。
- """
- def get_bucket_config_and_client(self) -> Tuple[Dict, Minio, str]:
- """
- 获取桶配置和 MinIO 客户端(带缓存),无需传参,直接使用上下文。
- 返回 (bucket_config, client, bucket_name)
- """
- # 解析上下文与配置
- current_ctx, user, owner_code = bucket_config_service.resolve_context()
- bucket_config = bucket_config_service.get_bucket_config(
- user=user,
- owner_code=owner_code,
- ctx=current_ctx,
- )
- cache_key = f"{user.id}/{owner_code}"
- # 缓存命中直接返回
- if cache_key in minio_client_cache:
- return bucket_config, minio_client_cache[cache_key], bucket_config["minioBucketName"]
- # 创建新的客户端
- secure = getattr(settings, "MINIO_SECURE", False)
- client = Minio(
- settings.MINIO_ENDPOINT,
- access_key=bucket_config["minioAccessKey"],
- secret_key=bucket_config["minioSecretKey"],
- secure=secure,
- )
- # 确保桶存在
- bucket_name = bucket_config["minioBucketName"]
- if not client.bucket_exists(bucket_name):
- client.make_bucket(bucket_name)
- # 写入缓存
- minio_client_cache[cache_key] = client
- return bucket_config, client, bucket_name
- # 基础 MinIO 操作封装,自动从上下文解析桶配置与客户端
- def list_objects(self, prefix: str = "", recursive: bool = False):
- _, client, bucket_name = self.get_bucket_config_and_client()
- return client.list_objects(bucket_name, prefix=prefix, recursive=recursive)
- def put_object(self, path: str, data, length: int, content_type: str):
- _, client, bucket_name = self.get_bucket_config_and_client()
- client.put_object(bucket_name, path, data, length=length, content_type=content_type)
- def put_empty_object(self, path: str):
- _, client, bucket_name = self.get_bucket_config_and_client()
- client.put_object(bucket_name, path, io.BytesIO(b""), 0)
- def get_object(self, path: str):
- _, client, bucket_name = self.get_bucket_config_and_client()
- return client.get_object(bucket_name, path)
- def remove_object(self, path: str):
- _, client, bucket_name = self.get_bucket_config_and_client()
- client.remove_object(bucket_name, path)
- def remove_objects_with_prefix(self, prefix: str):
- _, client, bucket_name = self.get_bucket_config_and_client()
- objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)
- for obj in objects:
- client.remove_object(bucket_name, obj.object_name)
- def get_presigned_url(self, path: str, expires):
- _, client, bucket_name = self.get_bucket_config_and_client()
- return client.get_presigned_url("GET", bucket_name, path, expires=expires)
- def get_storage_info(self) -> Dict:
- bucket_config, client, bucket_name = self.get_bucket_config_and_client()
- total_size = 0
- for obj in client.list_objects(bucket_name, recursive=True):
- total_size += obj.size
- quota_gb = bucket_config.get("quotaGb", 10)
- total_capacity = quota_gb * 1024 * 1024 * 1024
- used_percentage = (total_size / total_capacity) * 100 if total_capacity > 0 else 0
- return {
- "used_size": total_size,
- "total_capacity": total_capacity,
- "used_percentage": round(used_percentage, 1),
- "available_size": total_capacity - total_size,
- }
- # 单例
- minio_client_provider = MinioClientProvider()
|