minio_service.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # app/services/minio_service.py
  2. from minio import Minio
  3. from minio.error import S3Error
  4. from app.core.config import settings
  5. import io
  6. from datetime import timedelta
  7. class MinioService:
  8. def __init__(self):
  9. self.client = Minio(
  10. settings.MINIO_ENDPOINT,
  11. access_key=settings.MINIO_ACCESS_KEY,
  12. secret_key=settings.MINIO_SECRET_KEY,
  13. secure=settings.MINIO_SECURE
  14. )
  15. self.bucket = settings.MINIO_BUCKET_NAME
  16. self._ensure_bucket_exists()
  17. def _ensure_bucket_exists(self):
  18. if not self.client.bucket_exists(self.bucket):
  19. self.client.make_bucket(self.bucket)
  20. # 在 app/services/minio_service.py 中修复 list_files 方法
  21. def list_files(self, prefix: str = ""):
  22. # 清理前缀
  23. if prefix == "/":
  24. prefix = ""
  25. elif prefix.startswith("/"):
  26. prefix = prefix[1:]
  27. if prefix and not prefix.endswith("/"):
  28. prefix += "/"
  29. print(f"列出文件,前缀: '{prefix}'")
  30. try:
  31. objects = self.client.list_objects(self.bucket, prefix=prefix, recursive=False)
  32. results = []
  33. for obj in objects:
  34. # 跳过与前缀完全相同的对象(通常是文件夹标记)
  35. if obj.object_name == prefix:
  36. continue
  37. is_dir = obj.is_dir
  38. # 获取显示名称
  39. if is_dir:
  40. # 对于目录,去掉末尾的斜杠
  41. name = obj.object_name[len(prefix):].rstrip('/')
  42. else:
  43. name = obj.object_name[len(prefix):]
  44. # 跳过空名称
  45. if not name:
  46. continue
  47. results.append({
  48. "name": name,
  49. "path": obj.object_name,
  50. "is_dir": is_dir,
  51. "size": obj.size if not is_dir else 0,
  52. "last_modified": str(obj.last_modified) if not is_dir else None,
  53. "content_type": obj.content_type
  54. })
  55. # 排序
  56. results.sort(key=lambda x: (not x['is_dir'], x['name'].lower()))
  57. print(f"找到 {len(results)} 个文件/文件夹")
  58. return results
  59. except S3Error as e:
  60. print(f"列出文件错误: {e}")
  61. raise e
  62. # def list_files(self, prefix: str = ""):
  63. # # 处理前缀,确保以 / 结尾(除非是根目录)
  64. # if prefix == "/":
  65. # prefix = ""
  66. # elif prefix and not prefix.endswith("/"):
  67. # prefix += "/"
  68. #
  69. # objects = self.client.list_objects(self.bucket, prefix=prefix, recursive=False)
  70. #
  71. # results = []
  72. # for obj in objects:
  73. # # 跳过占位符对象
  74. # if obj.object_name == prefix:
  75. # continue
  76. #
  77. # is_dir = obj.is_dir
  78. # # 去掉前缀获取显示名称
  79. # name = obj.object_name[len(prefix):]
  80. # if is_dir:
  81. # name = name.rstrip("/")
  82. #
  83. # results.append({
  84. # "name": name,
  85. # "path": obj.object_name,
  86. # "is_dir": is_dir,
  87. # "size": obj.size if not is_dir else 0,
  88. # "last_modified": str(obj.last_modified) if not is_dir else None,
  89. # "content_type": obj.content_type
  90. # })
  91. #
  92. # # 排序:文件夹在前,然后按名称
  93. # results.sort(key=lambda x: (not x['is_dir'], x['name']))
  94. # return results
  95. def upload_file(self, file_obj, file_path: str, content_type: str, file_size: int):
  96. # file_obj 是 spooled temp file
  97. self.client.put_object(
  98. self.bucket,
  99. file_path,
  100. file_obj,
  101. length=file_size, # 如果无法获取确切大小,可以用 -1 并设置 part_size
  102. content_type=content_type
  103. )
  104. def create_folder(self, folder_path: str):
  105. if not folder_path.endswith("/"):
  106. folder_path += "/"
  107. # 创建一个空的 Object 以模拟文件夹
  108. self.client.put_object(
  109. self.bucket,
  110. folder_path,
  111. io.BytesIO(b""),
  112. 0
  113. )
  114. def get_file_stream(self, file_path: str):
  115. return self.client.get_object(self.bucket, file_path)
  116. def delete_file(self, path: str, is_dir: bool = False):
  117. if is_dir:
  118. # 递归删除
  119. objects_to_delete = self.client.list_objects(self.bucket, prefix=path, recursive=True)
  120. for obj in objects_to_delete:
  121. self.client.remove_object(self.bucket, obj.object_name)
  122. else:
  123. self.client.remove_object(self.bucket, path)
  124. def get_presigned_url(self, path: str):
  125. return self.client.get_presigned_url(
  126. "GET",
  127. self.bucket,
  128. path,
  129. expires=timedelta(hours=1)
  130. )
  131. def get_storage_info(self):
  132. """获取存储桶使用情况"""
  133. try:
  134. # 获取存储桶的总大小
  135. total_size = 0
  136. objects = self.client.list_objects(self.bucket, recursive=True)
  137. for obj in objects:
  138. total_size += obj.size
  139. # 这里可以设置存储桶的总容量(根据实际情况调整)
  140. total_capacity = 10 * 1024 * 1024 * 1024 # 10GB 示例
  141. used_percentage = (total_size / total_capacity) * 100 if total_capacity > 0 else 0
  142. return {
  143. "used_size": total_size,
  144. "total_capacity": total_capacity,
  145. "used_percentage": round(used_percentage, 1),
  146. "available_size": total_capacity - total_size
  147. }
  148. except S3Error as e:
  149. raise e
  150. # 实例化单例
  151. minio_service = MinioService()