depth_common.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import os
  2. from dataclasses import dataclass
  3. import cv2
  4. import numpy as np
  5. from pyorbbecsdk import Config, Pipeline, OBSensorType, OBFormat, OBError
  6. def _get_env_int(name, default):
  7. # 从环境变量读取整数,失败时返回默认值
  8. value = os.getenv(name)
  9. if value is None or value.strip() == "":
  10. return default
  11. try:
  12. return int(value)
  13. except ValueError:
  14. return default
  15. @dataclass(frozen=True)
  16. class Settings:
  17. # 深度数据处理参数
  18. min_depth: int
  19. max_depth: int
  20. roi_width_cm: int
  21. roi_height_cm: int
  22. median_blur_ksize: int
  23. morph_open_ksize: int
  24. nearest_percentile: int
  25. @classmethod
  26. def from_env(
  27. cls,
  28. *,
  29. median_blur_ksize=5,
  30. morph_open_ksize=3,
  31. nearest_percentile=5,
  32. ):
  33. # 从环境变量构建配置
  34. return cls(
  35. min_depth=_get_env_int("MIN_DEPTH", 500),
  36. max_depth=_get_env_int("MAX_DEPTH", 4000),
  37. roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10),
  38. roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 12),
  39. median_blur_ksize=median_blur_ksize,
  40. morph_open_ksize=morph_open_ksize,
  41. nearest_percentile=nearest_percentile,
  42. )
  43. class TemporalFilter:
  44. def __init__(self, alpha):
  45. # alpha 越大越偏向当前帧
  46. self.alpha = alpha
  47. self.previous_frame = None
  48. def process(self, frame):
  49. # 对连续帧做指数加权平滑
  50. if self.previous_frame is None:
  51. result = frame
  52. else:
  53. result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
  54. self.previous_frame = result
  55. return result
  56. def init_depth_pipeline():
  57. # 初始化相机并获取深度内参
  58. config = Config()
  59. pipeline = Pipeline()
  60. profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
  61. if profile_list is None:
  62. raise RuntimeError("depth profile list is empty")
  63. depth_profile = profile_list.get_default_video_stream_profile()
  64. if depth_profile is None:
  65. raise RuntimeError("default depth profile is empty")
  66. depth_intrinsics = depth_profile.get_intrinsic()
  67. config.enable_stream(depth_profile)
  68. # Optionally enable color stream so callers can access color frames when available.
  69. try:
  70. color_profile_list = pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR)
  71. if color_profile_list is not None:
  72. color_profile = color_profile_list.get_default_video_stream_profile()
  73. if color_profile is not None:
  74. config.enable_stream(color_profile)
  75. except OBError:
  76. # Some devices do not provide color sensors; keep depth-only behavior.
  77. pass
  78. pipeline.start(config)
  79. return pipeline, depth_intrinsics, depth_profile
  80. def extract_depth_data(depth_frame, settings, temporal_filter):
  81. # 从原始帧中提取并滤波深度数据
  82. if depth_frame is None:
  83. return None
  84. if depth_frame.get_format() != OBFormat.Y16:
  85. return None
  86. width = depth_frame.get_width()
  87. height = depth_frame.get_height()
  88. scale = depth_frame.get_depth_scale()
  89. # 读取深度数据并转换单位(毫米)
  90. depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
  91. depth_data = depth_data.reshape((height, width))
  92. depth_data = depth_data.astype(np.float32) * scale
  93. depth_data = np.where(
  94. (depth_data > settings.min_depth) & (depth_data < settings.max_depth),
  95. depth_data,
  96. 0,
  97. ).astype(np.uint16)
  98. # 中值滤波与开运算,去除噪点
  99. if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
  100. depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
  101. if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
  102. kernel = cv2.getStructuringElement(
  103. cv2.MORPH_ELLIPSE,
  104. (settings.morph_open_ksize, settings.morph_open_ksize),
  105. )
  106. valid_mask = (depth_data > 0).astype(np.uint8)
  107. valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
  108. depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
  109. # 可选的时间滤波
  110. if temporal_filter is not None:
  111. depth_data = temporal_filter.process(depth_data)
  112. return depth_data
  113. def compute_roi_bounds(depth_data, depth_intrinsics, settings):
  114. # 根据中心点距离动态计算 ROI 的像素范围
  115. height, width = depth_data.shape
  116. center_y = height // 2
  117. center_x = width // 2
  118. center_distance = int(depth_data[center_y, center_x])
  119. if center_distance <= 0:
  120. return None
  121. center_distance_m = center_distance / 1000.0
  122. if center_distance_m <= 0:
  123. return None
  124. half_width_m = (settings.roi_width_cm / 100) / 2
  125. half_height_m = (settings.roi_height_cm / 100) / 2
  126. half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
  127. half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
  128. if half_width_px <= 0 or half_height_px <= 0:
  129. return None
  130. half_width_px = min(half_width_px, center_x, width - center_x - 1)
  131. half_height_px = min(half_height_px, center_y, height - center_y - 1)
  132. if half_width_px <= 0 or half_height_px <= 0:
  133. return None
  134. x_start = center_x - half_width_px
  135. x_end = center_x + half_width_px + 1
  136. y_start = center_y - half_height_px
  137. y_end = center_y + half_height_px + 1
  138. return x_start, x_end, y_start, y_end, center_distance
  139. def nearest_distance_in_roi(roi, settings):
  140. # 计算 ROI 内的最近距离(或按百分位)
  141. valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
  142. if valid_values.size == 0:
  143. return None
  144. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  145. return int(np.percentile(valid_values, settings.nearest_percentile))
  146. return int(valid_values.min())
  147. def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
  148. # 返回最近点在整图中的坐标
  149. if nearest_distance is None or nearest_distance <= 0:
  150. return None
  151. roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
  152. roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
  153. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  154. roi_candidate = np.where(
  155. roi_candidate <= nearest_distance,
  156. roi_candidate,
  157. np.iinfo(np.uint16).max,
  158. )
  159. min_idx = np.argmin(roi_candidate)
  160. min_val = roi_candidate.flat[min_idx]
  161. if min_val == np.iinfo(np.uint16).max:
  162. return None
  163. min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
  164. return x_start + min_x, y_start + min_y