"""深度测量通用能力。 该模块封装了深度相机初始化、深度帧预处理、ROI 计算、最近点提取等核心逻辑, 供 API 服务与本地调试脚本共用。 """ import os from dataclasses import dataclass import cv2 import numpy as np from pyorbbecsdk import Config, OBError, OBFormat, OBSensorType, Pipeline def _get_env_int(name, default): """读取整型环境变量。 - 当变量不存在或为空字符串时返回默认值; - 当变量无法转换为整数时返回默认值。 """ value = os.getenv(name) if value is None or value.strip() == "": return default try: return int(value) except ValueError: return default @dataclass(frozen=True) class Settings: """深度处理参数集合。""" # 有效深度区间(内部单位:毫米);超出范围的像素会被置为 0。 # 环境变量 MIN_DEPTH/MAX_DEPTH 的输入单位为厘米。 min_depth: int max_depth: int # 以图像中心为基准,计算物理尺寸为 roi_width_cm x roi_height_cm 的 ROI。 roi_width_cm: int roi_height_cm: int # 预处理参数:中值滤波核大小、形态学开运算核大小(建议为奇数)。 median_blur_ksize: int morph_open_ksize: int # 最近距离统计分位数;例如 5 表示取前 5% 的分位值,降低偶发噪声影响。 nearest_percentile: int @classmethod def from_env( cls, *, median_blur_ksize=5, morph_open_ksize=3, nearest_percentile=5, ): """从环境变量创建参数对象。""" min_depth_cm = _get_env_int("MIN_DEPTH", 50) max_depth_cm = _get_env_int("MAX_DEPTH", 400) return cls( min_depth=min_depth_cm * 10, max_depth=max_depth_cm * 10, roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10), roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 10), median_blur_ksize=median_blur_ksize, morph_open_ksize=morph_open_ksize, nearest_percentile=nearest_percentile, ) class TemporalFilter: """简易时间域平滑滤波器。 使用指数加权平均(EWMA)降低帧间抖动: current = alpha * new + (1 - alpha) * previous """ def __init__(self, alpha): # alpha 越大,结果越偏向当前帧;越小,结果越平滑。 self.alpha = alpha self.previous_frame = None def process(self, frame): """对单帧执行时间滤波并返回结果。""" if self.previous_frame is None: # 第一帧没有历史值,直接返回原始帧。 result = frame else: result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0) self.previous_frame = result return result def init_depth_pipeline(): """初始化深度相机管线并返回关键对象。 返回:`(pipeline, depth_intrinsics, depth_profile)`。 """ config = Config() pipeline = Pipeline() # 配置深度流(必须)。 profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR) if profile_list is None: raise RuntimeError("depth profile list is empty") depth_profile = profile_list.get_default_video_stream_profile() if depth_profile is None: raise RuntimeError("default depth profile is empty") depth_intrinsics = depth_profile.get_intrinsic() config.enable_stream(depth_profile) # 尝试配置彩色流(可选)。部分设备不支持彩色,失败时保留深度能力即可。 try: color_profile_list = pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR) if color_profile_list is not None: color_profile = color_profile_list.get_default_video_stream_profile() if color_profile is not None: config.enable_stream(color_profile) except OBError: pass pipeline.start(config) return pipeline, depth_intrinsics, depth_profile def extract_depth_data(depth_frame, settings, temporal_filter): """从深度帧中提取并清洗深度矩阵(单位:毫米)。""" if depth_frame is None: return None if depth_frame.get_format() != OBFormat.Y16: # 当前逻辑仅处理 Y16 格式深度帧。 return None width = depth_frame.get_width() height = depth_frame.get_height() scale = depth_frame.get_depth_scale() # 1) 将原始缓冲区转为 2D 深度矩阵;2) 应用深度比例尺;3) 过滤无效深度范围。 depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16) depth_data = depth_data.reshape((height, width)) depth_data = depth_data.astype(np.float32) * scale depth_data = np.where( (depth_data > settings.min_depth) & (depth_data < settings.max_depth), depth_data, 0, ).astype(np.uint16) # 中值滤波:抑制椒盐噪声;核必须是奇数。 if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1: depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize) # 开运算:移除孤立噪点,保留连通区域。 if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1: kernel = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (settings.morph_open_ksize, settings.morph_open_ksize), ) valid_mask = (depth_data > 0).astype(np.uint8) valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel) depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16) # 时间滤波:进一步减小帧间波动。 if temporal_filter is not None: depth_data = temporal_filter.process(depth_data) return depth_data def compute_roi_bounds(depth_data, depth_intrinsics, settings): """计算中心 ROI 的像素边界。 逻辑说明: 1. 读取图像中心点深度作为当前距离参考; 2. 将目标物理尺寸(厘米)换算为该距离下的像素尺寸; 3. 将 ROI 裁剪到图像有效范围内。 """ height, width = depth_data.shape center_y = height // 2 center_x = width // 2 center_distance = int(depth_data[center_y, center_x]) if center_distance <= 0: return None center_distance_m = center_distance / 1000.0 if center_distance_m <= 0: return None half_width_m = (settings.roi_width_cm / 100) / 2 half_height_m = (settings.roi_height_cm / 100) / 2 # 使用相机内参把真实长度投影到像素平面。 half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m) half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m) if half_width_px <= 0 or half_height_px <= 0: return None # 防止 ROI 超出图像边界。 half_width_px = min(half_width_px, center_x, width - center_x - 1) half_height_px = min(half_height_px, center_y, height - center_y - 1) if half_width_px <= 0 or half_height_px <= 0: return None x_start = center_x - half_width_px x_end = center_x + half_width_px + 1 y_start = center_y - half_height_px y_end = center_y + half_height_px + 1 return x_start, x_end, y_start, y_end, center_distance def nearest_distance_in_roi(roi, settings): """计算 ROI 内最近距离。 默认返回有效像素最小值;若设置了分位数则返回对应分位值。 """ valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)] if valid_values.size == 0: return None if settings.nearest_percentile and 0 < settings.nearest_percentile < 100: return int(np.percentile(valid_values, settings.nearest_percentile)) return int(valid_values.min()) def find_nearest_point(roi, x_start, y_start, settings, nearest_distance): """在 ROI 中定位最近点,并返回其在整幅图中的坐标。""" if nearest_distance is None or nearest_distance <= 0: return None # 先把无效深度置为最大值,确保 argmin 不会选到无效像素。 roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth) roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max) # 使用分位数策略时,仅保留不大于 nearest_distance 的候选点。 if settings.nearest_percentile and 0 < settings.nearest_percentile < 100: roi_candidate = np.where( roi_candidate <= nearest_distance, roi_candidate, np.iinfo(np.uint16).max, ) min_idx = np.argmin(roi_candidate) min_val = roi_candidate.flat[min_idx] if min_val == np.iinfo(np.uint16).max: return None min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape) return x_start + min_x, y_start + min_y