| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- # app/services/minio_service.py
- from minio import Minio
- from minio.error import S3Error
- from app.core.config import settings
- import io
- from datetime import timedelta
- class MinioService:
- def __init__(self):
- self.client = Minio(
- settings.MINIO_ENDPOINT,
- access_key=settings.MINIO_ACCESS_KEY,
- secret_key=settings.MINIO_SECRET_KEY,
- secure=settings.MINIO_SECURE
- )
- self.bucket = settings.MINIO_BUCKET_NAME
- self._ensure_bucket_exists()
- def _ensure_bucket_exists(self):
- if not self.client.bucket_exists(self.bucket):
- self.client.make_bucket(self.bucket)
- # 在 app/services/minio_service.py 中修复 list_files 方法
- def list_files(self, prefix: str = ""):
- # 清理前缀
- if prefix == "/":
- prefix = ""
- elif prefix.startswith("/"):
- prefix = prefix[1:]
- if prefix and not prefix.endswith("/"):
- prefix += "/"
- print(f"列出文件,前缀: '{prefix}'")
- try:
- objects = self.client.list_objects(self.bucket, prefix=prefix, recursive=False)
- results = []
- for obj in objects:
- # 跳过与前缀完全相同的对象(通常是文件夹标记)
- if obj.object_name == prefix:
- continue
- is_dir = obj.is_dir
- # 获取显示名称
- if is_dir:
- # 对于目录,去掉末尾的斜杠
- name = obj.object_name[len(prefix):].rstrip('/')
- else:
- name = obj.object_name[len(prefix):]
- # 跳过空名称
- if not name:
- continue
- results.append({
- "name": name,
- "path": obj.object_name,
- "is_dir": is_dir,
- "size": obj.size if not is_dir else 0,
- "last_modified": str(obj.last_modified) if not is_dir else None,
- "content_type": obj.content_type
- })
- # 排序
- results.sort(key=lambda x: (not x['is_dir'], x['name'].lower()))
- print(f"找到 {len(results)} 个文件/文件夹")
- return results
- except S3Error as e:
- print(f"列出文件错误: {e}")
- raise e
- # def list_files(self, prefix: str = ""):
- # # 处理前缀,确保以 / 结尾(除非是根目录)
- # if prefix == "/":
- # prefix = ""
- # elif prefix and not prefix.endswith("/"):
- # prefix += "/"
- #
- # objects = self.client.list_objects(self.bucket, prefix=prefix, recursive=False)
- #
- # results = []
- # for obj in objects:
- # # 跳过占位符对象
- # if obj.object_name == prefix:
- # continue
- #
- # is_dir = obj.is_dir
- # # 去掉前缀获取显示名称
- # name = obj.object_name[len(prefix):]
- # if is_dir:
- # name = name.rstrip("/")
- #
- # results.append({
- # "name": name,
- # "path": obj.object_name,
- # "is_dir": is_dir,
- # "size": obj.size if not is_dir else 0,
- # "last_modified": str(obj.last_modified) if not is_dir else None,
- # "content_type": obj.content_type
- # })
- #
- # # 排序:文件夹在前,然后按名称
- # results.sort(key=lambda x: (not x['is_dir'], x['name']))
- # return results
- def upload_file(self, file_obj, file_path: str, content_type: str, file_size: int):
- # file_obj 是 spooled temp file
- self.client.put_object(
- self.bucket,
- file_path,
- file_obj,
- length=file_size, # 如果无法获取确切大小,可以用 -1 并设置 part_size
- content_type=content_type
- )
- def create_folder(self, folder_path: str):
- if not folder_path.endswith("/"):
- folder_path += "/"
- # 创建一个空的 Object 以模拟文件夹
- self.client.put_object(
- self.bucket,
- folder_path,
- io.BytesIO(b""),
- 0
- )
- def get_file_stream(self, file_path: str):
- return self.client.get_object(self.bucket, file_path)
- def delete_file(self, path: str, is_dir: bool = False):
- if is_dir:
- # 递归删除
- objects_to_delete = self.client.list_objects(self.bucket, prefix=path, recursive=True)
- for obj in objects_to_delete:
- self.client.remove_object(self.bucket, obj.object_name)
- else:
- self.client.remove_object(self.bucket, path)
- def get_presigned_url(self, path: str):
- return self.client.get_presigned_url(
- "GET",
- self.bucket,
- path,
- expires=timedelta(hours=1)
- )
- def get_storage_info(self):
- """获取存储桶使用情况"""
- try:
- # 获取存储桶的总大小
- total_size = 0
- objects = self.client.list_objects(self.bucket, recursive=True)
- for obj in objects:
- total_size += obj.size
- # 这里可以设置存储桶的总容量(根据实际情况调整)
- total_capacity = 10 * 1024 * 1024 * 1024 # 10GB 示例
- used_percentage = (total_size / total_capacity) * 100 if total_capacity > 0 else 0
- return {
- "used_size": total_size,
- "total_capacity": total_capacity,
- "used_percentage": round(used_percentage, 1),
- "available_size": total_capacity - total_size
- }
- except S3Error as e:
- raise e
- # 实例化单例
- minio_service = MinioService()
|