depth_common.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """深度测量通用能力。
  2. 该模块封装了深度相机初始化、深度帧预处理、ROI 计算、最近点提取等核心逻辑,
  3. 供 API 服务与本地调试脚本共用。
  4. """
  5. import os
  6. from dataclasses import dataclass
  7. import cv2
  8. import numpy as np
  9. from pyorbbecsdk import Config, OBError, OBFormat, OBSensorType, Pipeline
  10. def _get_env_int(name, default):
  11. """读取整型环境变量。
  12. - 当变量不存在或为空字符串时返回默认值;
  13. - 当变量无法转换为整数时返回默认值。
  14. """
  15. value = os.getenv(name)
  16. if value is None or value.strip() == "":
  17. return default
  18. try:
  19. return int(value)
  20. except ValueError:
  21. return default
  22. @dataclass(frozen=True)
  23. class Settings:
  24. """深度处理参数集合。"""
  25. # 有效深度区间(内部单位:毫米);超出范围的像素会被置为 0。
  26. # 环境变量 MIN_DEPTH/MAX_DEPTH 的输入单位为厘米。
  27. min_depth: int
  28. max_depth: int
  29. # 以图像中心为基准,计算物理尺寸为 roi_width_cm x roi_height_cm 的 ROI。
  30. roi_width_cm: int
  31. roi_height_cm: int
  32. # 预处理参数:中值滤波核大小、形态学开运算核大小(建议为奇数)。
  33. median_blur_ksize: int
  34. morph_open_ksize: int
  35. # 最近距离统计分位数;例如 5 表示取前 5% 的分位值,降低偶发噪声影响。
  36. nearest_percentile: int
  37. @classmethod
  38. def from_env(
  39. cls,
  40. *,
  41. median_blur_ksize=5,
  42. morph_open_ksize=3,
  43. nearest_percentile=5,
  44. ):
  45. """从环境变量创建参数对象。"""
  46. min_depth_cm = _get_env_int("MIN_DEPTH", 50)
  47. max_depth_cm = _get_env_int("MAX_DEPTH", 400)
  48. return cls(
  49. min_depth=min_depth_cm * 10,
  50. max_depth=max_depth_cm * 10,
  51. roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10),
  52. roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 10),
  53. median_blur_ksize=median_blur_ksize,
  54. morph_open_ksize=morph_open_ksize,
  55. nearest_percentile=nearest_percentile,
  56. )
  57. class TemporalFilter:
  58. """简易时间域平滑滤波器。
  59. 使用指数加权平均(EWMA)降低帧间抖动:
  60. current = alpha * new + (1 - alpha) * previous
  61. """
  62. def __init__(self, alpha):
  63. # alpha 越大,结果越偏向当前帧;越小,结果越平滑。
  64. self.alpha = alpha
  65. self.previous_frame = None
  66. def process(self, frame):
  67. """对单帧执行时间滤波并返回结果。"""
  68. if self.previous_frame is None:
  69. # 第一帧没有历史值,直接返回原始帧。
  70. result = frame
  71. else:
  72. result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
  73. self.previous_frame = result
  74. return result
  75. def init_depth_pipeline():
  76. """初始化深度相机管线并返回关键对象。
  77. 返回:`(pipeline, depth_intrinsics, depth_profile)`。
  78. """
  79. config = Config()
  80. pipeline = Pipeline()
  81. # 配置深度流(必须)。
  82. profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
  83. if profile_list is None:
  84. raise RuntimeError("depth profile list is empty")
  85. depth_profile = profile_list.get_default_video_stream_profile()
  86. if depth_profile is None:
  87. raise RuntimeError("default depth profile is empty")
  88. depth_intrinsics = depth_profile.get_intrinsic()
  89. config.enable_stream(depth_profile)
  90. # 尝试配置彩色流(可选)。部分设备不支持彩色,失败时保留深度能力即可。
  91. try:
  92. color_profile_list = pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR)
  93. if color_profile_list is not None:
  94. color_profile = color_profile_list.get_default_video_stream_profile()
  95. if color_profile is not None:
  96. config.enable_stream(color_profile)
  97. except OBError:
  98. pass
  99. pipeline.start(config)
  100. return pipeline, depth_intrinsics, depth_profile
  101. def extract_depth_data(depth_frame, settings, temporal_filter):
  102. """从深度帧中提取并清洗深度矩阵(单位:毫米)。"""
  103. if depth_frame is None:
  104. return None
  105. if depth_frame.get_format() != OBFormat.Y16:
  106. # 当前逻辑仅处理 Y16 格式深度帧。
  107. return None
  108. width = depth_frame.get_width()
  109. height = depth_frame.get_height()
  110. scale = depth_frame.get_depth_scale()
  111. # 1) 将原始缓冲区转为 2D 深度矩阵;2) 应用深度比例尺;3) 过滤无效深度范围。
  112. depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
  113. depth_data = depth_data.reshape((height, width))
  114. depth_data = depth_data.astype(np.float32) * scale
  115. depth_data = np.where(
  116. (depth_data > settings.min_depth) & (depth_data < settings.max_depth),
  117. depth_data,
  118. 0,
  119. ).astype(np.uint16)
  120. # 中值滤波:抑制椒盐噪声;核必须是奇数。
  121. if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
  122. depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
  123. # 开运算:移除孤立噪点,保留连通区域。
  124. if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
  125. kernel = cv2.getStructuringElement(
  126. cv2.MORPH_ELLIPSE,
  127. (settings.morph_open_ksize, settings.morph_open_ksize),
  128. )
  129. valid_mask = (depth_data > 0).astype(np.uint8)
  130. valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
  131. depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
  132. # 时间滤波:进一步减小帧间波动。
  133. if temporal_filter is not None:
  134. depth_data = temporal_filter.process(depth_data)
  135. return depth_data
  136. def compute_roi_bounds(depth_data, depth_intrinsics, settings):
  137. """计算中心 ROI 的像素边界。
  138. 逻辑说明:
  139. 1. 读取图像中心点深度作为当前距离参考;
  140. 2. 将目标物理尺寸(厘米)换算为该距离下的像素尺寸;
  141. 3. 将 ROI 裁剪到图像有效范围内。
  142. """
  143. height, width = depth_data.shape
  144. center_y = height // 2
  145. center_x = width // 2
  146. center_distance = int(depth_data[center_y, center_x])
  147. if center_distance <= 0:
  148. return None
  149. center_distance_m = center_distance / 1000.0
  150. if center_distance_m <= 0:
  151. return None
  152. half_width_m = (settings.roi_width_cm / 100) / 2
  153. half_height_m = (settings.roi_height_cm / 100) / 2
  154. # 使用相机内参把真实长度投影到像素平面。
  155. half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
  156. half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
  157. if half_width_px <= 0 or half_height_px <= 0:
  158. return None
  159. # 防止 ROI 超出图像边界。
  160. half_width_px = min(half_width_px, center_x, width - center_x - 1)
  161. half_height_px = min(half_height_px, center_y, height - center_y - 1)
  162. if half_width_px <= 0 or half_height_px <= 0:
  163. return None
  164. x_start = center_x - half_width_px
  165. x_end = center_x + half_width_px + 1
  166. y_start = center_y - half_height_px
  167. y_end = center_y + half_height_px + 1
  168. return x_start, x_end, y_start, y_end, center_distance
  169. def nearest_distance_in_roi(roi, settings):
  170. """计算 ROI 内最近距离。
  171. 默认返回有效像素最小值;若设置了分位数则返回对应分位值。
  172. """
  173. valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
  174. if valid_values.size == 0:
  175. return None
  176. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  177. return int(np.percentile(valid_values, settings.nearest_percentile))
  178. return int(valid_values.min())
  179. def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
  180. """在 ROI 中定位最近点,并返回其在整幅图中的坐标。"""
  181. if nearest_distance is None or nearest_distance <= 0:
  182. return None
  183. # 先把无效深度置为最大值,确保 argmin 不会选到无效像素。
  184. roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
  185. roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
  186. # 使用分位数策略时,仅保留不大于 nearest_distance 的候选点。
  187. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  188. roi_candidate = np.where(
  189. roi_candidate <= nearest_distance,
  190. roi_candidate,
  191. np.iinfo(np.uint16).max,
  192. )
  193. min_idx = np.argmin(roi_candidate)
  194. min_val = roi_candidate.flat[min_idx]
  195. if min_val == np.iinfo(np.uint16).max:
  196. return None
  197. min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
  198. return x_start + min_x, y_start + min_y