depth_common.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """深度测量通用能力。
  2. 该模块封装了深度相机初始化、深度帧预处理、ROI 计算、最近点提取等核心逻辑,
  3. 供 API 服务与本地调试脚本共用。
  4. """
  5. import os
  6. from dataclasses import dataclass
  7. import cv2
  8. import numpy as np
  9. from pyorbbecsdk import Config, OBError, OBFormat, OBSensorType, Pipeline
  10. def _get_env_int(name, default):
  11. """读取整型环境变量。
  12. - 当变量不存在或为空字符串时返回默认值;
  13. - 当变量无法转换为整数时返回默认值。
  14. """
  15. value = os.getenv(name)
  16. if value is None or value.strip() == "":
  17. return default
  18. try:
  19. return int(value)
  20. except ValueError:
  21. return default
  22. @dataclass(frozen=True)
  23. class Settings:
  24. """深度处理参数集合。"""
  25. # 有效深度区间(毫米);超出范围的像素会被置为 0。
  26. min_depth: int
  27. max_depth: int
  28. # 以图像中心为基准,计算物理尺寸为 roi_width_cm x roi_height_cm 的 ROI。
  29. roi_width_cm: int
  30. roi_height_cm: int
  31. # 预处理参数:中值滤波核大小、形态学开运算核大小(建议为奇数)。
  32. median_blur_ksize: int
  33. morph_open_ksize: int
  34. # 最近距离统计分位数;例如 5 表示取前 5% 的分位值,降低偶发噪声影响。
  35. nearest_percentile: int
  36. @classmethod
  37. def from_env(
  38. cls,
  39. *,
  40. median_blur_ksize=5,
  41. morph_open_ksize=3,
  42. nearest_percentile=5,
  43. ):
  44. """从环境变量创建参数对象。"""
  45. return cls(
  46. min_depth=_get_env_int("MIN_DEPTH", 500),
  47. max_depth=_get_env_int("MAX_DEPTH", 4000),
  48. roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10),
  49. roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 12),
  50. median_blur_ksize=median_blur_ksize,
  51. morph_open_ksize=morph_open_ksize,
  52. nearest_percentile=nearest_percentile,
  53. )
  54. class TemporalFilter:
  55. """简易时间域平滑滤波器。
  56. 使用指数加权平均(EWMA)降低帧间抖动:
  57. current = alpha * new + (1 - alpha) * previous
  58. """
  59. def __init__(self, alpha):
  60. # alpha 越大,结果越偏向当前帧;越小,结果越平滑。
  61. self.alpha = alpha
  62. self.previous_frame = None
  63. def process(self, frame):
  64. """对单帧执行时间滤波并返回结果。"""
  65. if self.previous_frame is None:
  66. # 第一帧没有历史值,直接返回原始帧。
  67. result = frame
  68. else:
  69. result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
  70. self.previous_frame = result
  71. return result
  72. def init_depth_pipeline():
  73. """初始化深度相机管线并返回关键对象。
  74. 返回:`(pipeline, depth_intrinsics, depth_profile)`。
  75. """
  76. config = Config()
  77. pipeline = Pipeline()
  78. # 配置深度流(必须)。
  79. profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
  80. if profile_list is None:
  81. raise RuntimeError("depth profile list is empty")
  82. depth_profile = profile_list.get_default_video_stream_profile()
  83. if depth_profile is None:
  84. raise RuntimeError("default depth profile is empty")
  85. depth_intrinsics = depth_profile.get_intrinsic()
  86. config.enable_stream(depth_profile)
  87. # 尝试配置彩色流(可选)。部分设备不支持彩色,失败时保留深度能力即可。
  88. try:
  89. color_profile_list = pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR)
  90. if color_profile_list is not None:
  91. color_profile = color_profile_list.get_default_video_stream_profile()
  92. if color_profile is not None:
  93. config.enable_stream(color_profile)
  94. except OBError:
  95. pass
  96. pipeline.start(config)
  97. return pipeline, depth_intrinsics, depth_profile
  98. def extract_depth_data(depth_frame, settings, temporal_filter):
  99. """从深度帧中提取并清洗深度矩阵(单位:毫米)。"""
  100. if depth_frame is None:
  101. return None
  102. if depth_frame.get_format() != OBFormat.Y16:
  103. # 当前逻辑仅处理 Y16 格式深度帧。
  104. return None
  105. width = depth_frame.get_width()
  106. height = depth_frame.get_height()
  107. scale = depth_frame.get_depth_scale()
  108. # 1) 将原始缓冲区转为 2D 深度矩阵;2) 应用深度比例尺;3) 过滤无效深度范围。
  109. depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
  110. depth_data = depth_data.reshape((height, width))
  111. depth_data = depth_data.astype(np.float32) * scale
  112. depth_data = np.where(
  113. (depth_data > settings.min_depth) & (depth_data < settings.max_depth),
  114. depth_data,
  115. 0,
  116. ).astype(np.uint16)
  117. # 中值滤波:抑制椒盐噪声;核必须是奇数。
  118. if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
  119. depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
  120. # 开运算:移除孤立噪点,保留连通区域。
  121. if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
  122. kernel = cv2.getStructuringElement(
  123. cv2.MORPH_ELLIPSE,
  124. (settings.morph_open_ksize, settings.morph_open_ksize),
  125. )
  126. valid_mask = (depth_data > 0).astype(np.uint8)
  127. valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
  128. depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
  129. # 时间滤波:进一步减小帧间波动。
  130. if temporal_filter is not None:
  131. depth_data = temporal_filter.process(depth_data)
  132. return depth_data
  133. def compute_roi_bounds(depth_data, depth_intrinsics, settings):
  134. """计算中心 ROI 的像素边界。
  135. 逻辑说明:
  136. 1. 读取图像中心点深度作为当前距离参考;
  137. 2. 将目标物理尺寸(厘米)换算为该距离下的像素尺寸;
  138. 3. 将 ROI 裁剪到图像有效范围内。
  139. """
  140. height, width = depth_data.shape
  141. center_y = height // 2
  142. center_x = width // 2
  143. center_distance = int(depth_data[center_y, center_x])
  144. if center_distance <= 0:
  145. return None
  146. center_distance_m = center_distance / 1000.0
  147. if center_distance_m <= 0:
  148. return None
  149. half_width_m = (settings.roi_width_cm / 100) / 2
  150. half_height_m = (settings.roi_height_cm / 100) / 2
  151. # 使用相机内参把真实长度投影到像素平面。
  152. half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
  153. half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
  154. if half_width_px <= 0 or half_height_px <= 0:
  155. return None
  156. # 防止 ROI 超出图像边界。
  157. half_width_px = min(half_width_px, center_x, width - center_x - 1)
  158. half_height_px = min(half_height_px, center_y, height - center_y - 1)
  159. if half_width_px <= 0 or half_height_px <= 0:
  160. return None
  161. x_start = center_x - half_width_px
  162. x_end = center_x + half_width_px + 1
  163. y_start = center_y - half_height_px
  164. y_end = center_y + half_height_px + 1
  165. return x_start, x_end, y_start, y_end, center_distance
  166. def nearest_distance_in_roi(roi, settings):
  167. """计算 ROI 内最近距离。
  168. 默认返回有效像素最小值;若设置了分位数则返回对应分位值。
  169. """
  170. valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
  171. if valid_values.size == 0:
  172. return None
  173. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  174. return int(np.percentile(valid_values, settings.nearest_percentile))
  175. return int(valid_values.min())
  176. def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
  177. """在 ROI 中定位最近点,并返回其在整幅图中的坐标。"""
  178. if nearest_distance is None or nearest_distance <= 0:
  179. return None
  180. # 先把无效深度置为最大值,确保 argmin 不会选到无效像素。
  181. roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
  182. roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
  183. # 使用分位数策略时,仅保留不大于 nearest_distance 的候选点。
  184. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  185. roi_candidate = np.where(
  186. roi_candidate <= nearest_distance,
  187. roi_candidate,
  188. np.iinfo(np.uint16).max,
  189. )
  190. min_idx = np.argmin(roi_candidate)
  191. min_val = roi_candidate.flat[min_idx]
  192. if min_val == np.iinfo(np.uint16).max:
  193. return None
  194. min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
  195. return x_start + min_x, y_start + min_y