import os from dataclasses import dataclass import cv2 import numpy as np from pyorbbecsdk import Config, Pipeline, OBSensorType, OBFormat def _get_env_int(name, default): # 从环境变量读取整数,失败时返回默认值 value = os.getenv(name) if value is None or value.strip() == "": return default try: return int(value) except ValueError: return default @dataclass(frozen=True) class Settings: # 深度数据处理参数 min_depth: int max_depth: int roi_width_cm: int roi_height_cm: int median_blur_ksize: int morph_open_ksize: int nearest_percentile: int @classmethod def from_env( cls, *, median_blur_ksize=5, morph_open_ksize=3, nearest_percentile=5, ): # 从环境变量构建配置 return cls( min_depth=_get_env_int("MIN_DEPTH", 500), max_depth=_get_env_int("MAX_DEPTH", 4000), roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10), roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 12), median_blur_ksize=median_blur_ksize, morph_open_ksize=morph_open_ksize, nearest_percentile=nearest_percentile, ) class TemporalFilter: def __init__(self, alpha): # alpha 越大越偏向当前帧 self.alpha = alpha self.previous_frame = None def process(self, frame): # 对连续帧做指数加权平滑 if self.previous_frame is None: result = frame else: result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0) self.previous_frame = result return result def init_depth_pipeline(): # 初始化相机并获取深度内参 config = Config() pipeline = Pipeline() profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR) if profile_list is None: raise RuntimeError("depth profile list is empty") depth_profile = profile_list.get_default_video_stream_profile() if depth_profile is None: raise RuntimeError("default depth profile is empty") depth_intrinsics = depth_profile.get_intrinsic() config.enable_stream(depth_profile) pipeline.start(config) return pipeline, depth_intrinsics, depth_profile def extract_depth_data(depth_frame, settings, temporal_filter): # 从原始帧中提取并滤波深度数据 if depth_frame is None: return None if depth_frame.get_format() != OBFormat.Y16: return None width = depth_frame.get_width() height = depth_frame.get_height() scale = depth_frame.get_depth_scale() # 读取深度数据并转换单位(毫米) depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16) depth_data = depth_data.reshape((height, width)) depth_data = depth_data.astype(np.float32) * scale depth_data = np.where( (depth_data > settings.min_depth) & (depth_data < settings.max_depth), depth_data, 0, ).astype(np.uint16) # 中值滤波与开运算,去除噪点 if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1: depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize) if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1: kernel = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (settings.morph_open_ksize, settings.morph_open_ksize), ) valid_mask = (depth_data > 0).astype(np.uint8) valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel) depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16) # 可选的时间滤波 if temporal_filter is not None: depth_data = temporal_filter.process(depth_data) return depth_data def compute_roi_bounds(depth_data, depth_intrinsics, settings): # 根据中心点距离动态计算 ROI 的像素范围 height, width = depth_data.shape center_y = height // 2 center_x = width // 2 center_distance = int(depth_data[center_y, center_x]) if center_distance <= 0: return None center_distance_m = center_distance / 1000.0 if center_distance_m <= 0: return None half_width_m = (settings.roi_width_cm / 100) / 2 half_height_m = (settings.roi_height_cm / 100) / 2 half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m) half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m) if half_width_px <= 0 or half_height_px <= 0: return None half_width_px = min(half_width_px, center_x, width - center_x - 1) half_height_px = min(half_height_px, center_y, height - center_y - 1) if half_width_px <= 0 or half_height_px <= 0: return None x_start = center_x - half_width_px x_end = center_x + half_width_px + 1 y_start = center_y - half_height_px y_end = center_y + half_height_px + 1 return x_start, x_end, y_start, y_end, center_distance def nearest_distance_in_roi(roi, settings): # 计算 ROI 内的最近距离(或按百分位) valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)] if valid_values.size == 0: return None if settings.nearest_percentile and 0 < settings.nearest_percentile < 100: return int(np.percentile(valid_values, settings.nearest_percentile)) return int(valid_values.min()) def find_nearest_point(roi, x_start, y_start, settings, nearest_distance): # 返回最近点在整图中的坐标 if nearest_distance is None or nearest_distance <= 0: return None roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth) roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max) if settings.nearest_percentile and 0 < settings.nearest_percentile < 100: roi_candidate = np.where( roi_candidate <= nearest_distance, roi_candidate, np.iinfo(np.uint16).max, ) min_idx = np.argmin(roi_candidate) min_val = roi_candidate.flat[min_idx] if min_val == np.iinfo(np.uint16).max: return None min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape) return x_start + min_x, y_start + min_y