depth_common.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import os
  2. from dataclasses import dataclass
  3. import cv2
  4. import numpy as np
  5. from pyorbbecsdk import Config, Pipeline, OBSensorType, OBFormat
  6. def _get_env_int(name, default):
  7. # 从环境变量读取整数,失败时返回默认值
  8. value = os.getenv(name)
  9. if value is None or value.strip() == "":
  10. return default
  11. try:
  12. return int(value)
  13. except ValueError:
  14. return default
  15. @dataclass(frozen=True)
  16. class Settings:
  17. # 深度数据处理参数
  18. min_depth: int
  19. max_depth: int
  20. roi_width_cm: int
  21. roi_height_cm: int
  22. median_blur_ksize: int
  23. morph_open_ksize: int
  24. nearest_percentile: int
  25. @classmethod
  26. def from_env(
  27. cls,
  28. *,
  29. median_blur_ksize=5,
  30. morph_open_ksize=3,
  31. nearest_percentile=5,
  32. ):
  33. # 从环境变量构建配置
  34. return cls(
  35. min_depth=_get_env_int("MIN_DEPTH", 500),
  36. max_depth=_get_env_int("MAX_DEPTH", 4000),
  37. roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10),
  38. roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 12),
  39. median_blur_ksize=median_blur_ksize,
  40. morph_open_ksize=morph_open_ksize,
  41. nearest_percentile=nearest_percentile,
  42. )
  43. class TemporalFilter:
  44. def __init__(self, alpha):
  45. # alpha 越大越偏向当前帧
  46. self.alpha = alpha
  47. self.previous_frame = None
  48. def process(self, frame):
  49. # 对连续帧做指数加权平滑
  50. if self.previous_frame is None:
  51. result = frame
  52. else:
  53. result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
  54. self.previous_frame = result
  55. return result
  56. def init_depth_pipeline():
  57. # 初始化相机并获取深度内参
  58. config = Config()
  59. pipeline = Pipeline()
  60. profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
  61. if profile_list is None:
  62. raise RuntimeError("depth profile list is empty")
  63. depth_profile = profile_list.get_default_video_stream_profile()
  64. if depth_profile is None:
  65. raise RuntimeError("default depth profile is empty")
  66. depth_intrinsics = depth_profile.get_intrinsic()
  67. config.enable_stream(depth_profile)
  68. pipeline.start(config)
  69. return pipeline, depth_intrinsics, depth_profile
  70. def extract_depth_data(depth_frame, settings, temporal_filter):
  71. # 从原始帧中提取并滤波深度数据
  72. if depth_frame is None:
  73. return None
  74. if depth_frame.get_format() != OBFormat.Y16:
  75. return None
  76. width = depth_frame.get_width()
  77. height = depth_frame.get_height()
  78. scale = depth_frame.get_depth_scale()
  79. # 读取深度数据并转换单位(毫米)
  80. depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
  81. depth_data = depth_data.reshape((height, width))
  82. depth_data = depth_data.astype(np.float32) * scale
  83. depth_data = np.where(
  84. (depth_data > settings.min_depth) & (depth_data < settings.max_depth),
  85. depth_data,
  86. 0,
  87. ).astype(np.uint16)
  88. # 中值滤波与开运算,去除噪点
  89. if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
  90. depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
  91. if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
  92. kernel = cv2.getStructuringElement(
  93. cv2.MORPH_ELLIPSE,
  94. (settings.morph_open_ksize, settings.morph_open_ksize),
  95. )
  96. valid_mask = (depth_data > 0).astype(np.uint8)
  97. valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
  98. depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
  99. # 可选的时间滤波
  100. if temporal_filter is not None:
  101. depth_data = temporal_filter.process(depth_data)
  102. return depth_data
  103. def compute_roi_bounds(depth_data, depth_intrinsics, settings):
  104. # 根据中心点距离动态计算 ROI 的像素范围
  105. height, width = depth_data.shape
  106. center_y = height // 2
  107. center_x = width // 2
  108. center_distance = int(depth_data[center_y, center_x])
  109. if center_distance <= 0:
  110. return None
  111. center_distance_m = center_distance / 1000.0
  112. if center_distance_m <= 0:
  113. return None
  114. half_width_m = (settings.roi_width_cm / 100) / 2
  115. half_height_m = (settings.roi_height_cm / 100) / 2
  116. half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
  117. half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
  118. if half_width_px <= 0 or half_height_px <= 0:
  119. return None
  120. half_width_px = min(half_width_px, center_x, width - center_x - 1)
  121. half_height_px = min(half_height_px, center_y, height - center_y - 1)
  122. if half_width_px <= 0 or half_height_px <= 0:
  123. return None
  124. x_start = center_x - half_width_px
  125. x_end = center_x + half_width_px + 1
  126. y_start = center_y - half_height_px
  127. y_end = center_y + half_height_px + 1
  128. return x_start, x_end, y_start, y_end, center_distance
  129. def nearest_distance_in_roi(roi, settings):
  130. # 计算 ROI 内的最近距离(或按百分位)
  131. valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
  132. if valid_values.size == 0:
  133. return None
  134. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  135. return int(np.percentile(valid_values, settings.nearest_percentile))
  136. return int(valid_values.min())
  137. def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
  138. # 返回最近点在整图中的坐标
  139. if nearest_distance is None or nearest_distance <= 0:
  140. return None
  141. roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
  142. roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
  143. if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
  144. roi_candidate = np.where(
  145. roi_candidate <= nearest_distance,
  146. roi_candidate,
  147. np.iinfo(np.uint16).max,
  148. )
  149. min_idx = np.argmin(roi_candidate)
  150. min_val = roi_candidate.flat[min_idx]
  151. if min_val == np.iinfo(np.uint16).max:
  152. return None
  153. min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
  154. return x_start + min_x, y_start + min_y