| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- """深度测量通用能力。
- 该模块封装了深度相机初始化、深度帧预处理、ROI 计算、最近点提取等核心逻辑,
- 供 API 服务与本地调试脚本共用。
- """
- import os
- from dataclasses import dataclass
- import cv2
- import numpy as np
- from pyorbbecsdk import Config, OBError, OBFormat, OBSensorType, Pipeline
- def _get_env_int(name, default):
- """读取整型环境变量。
- - 当变量不存在或为空字符串时返回默认值;
- - 当变量无法转换为整数时返回默认值。
- """
- value = os.getenv(name)
- if value is None or value.strip() == "":
- return default
- try:
- return int(value)
- except ValueError:
- return default
- @dataclass(frozen=True)
- class Settings:
- """深度处理参数集合。"""
- # 有效深度区间(内部单位:毫米);超出范围的像素会被置为 0。
- # 环境变量 MIN_DEPTH/MAX_DEPTH 的输入单位为厘米。
- min_depth: int
- max_depth: int
- # 以图像中心为基准,计算物理尺寸为 roi_width_cm x roi_height_cm 的 ROI。
- roi_width_cm: int
- roi_height_cm: int
- # 预处理参数:中值滤波核大小、形态学开运算核大小(建议为奇数)。
- median_blur_ksize: int
- morph_open_ksize: int
- # 最近距离统计分位数;例如 5 表示取前 5% 的分位值,降低偶发噪声影响。
- nearest_percentile: int
- @classmethod
- def from_env(
- cls,
- *,
- median_blur_ksize=5,
- morph_open_ksize=3,
- nearest_percentile=5,
- ):
- """从环境变量创建参数对象。"""
- min_depth_cm = _get_env_int("MIN_DEPTH", 50)
- max_depth_cm = _get_env_int("MAX_DEPTH", 400)
- return cls(
- min_depth=min_depth_cm * 10,
- max_depth=max_depth_cm * 10,
- roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10),
- roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 10),
- median_blur_ksize=median_blur_ksize,
- morph_open_ksize=morph_open_ksize,
- nearest_percentile=nearest_percentile,
- )
- class TemporalFilter:
- """简易时间域平滑滤波器。
- 使用指数加权平均(EWMA)降低帧间抖动:
- current = alpha * new + (1 - alpha) * previous
- """
- def __init__(self, alpha):
- # alpha 越大,结果越偏向当前帧;越小,结果越平滑。
- self.alpha = alpha
- self.previous_frame = None
- def process(self, frame):
- """对单帧执行时间滤波并返回结果。"""
- if self.previous_frame is None:
- # 第一帧没有历史值,直接返回原始帧。
- result = frame
- else:
- result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
- self.previous_frame = result
- return result
- def init_depth_pipeline():
- """初始化深度相机管线并返回关键对象。
- 返回:`(pipeline, depth_intrinsics, depth_profile)`。
- """
- config = Config()
- pipeline = Pipeline()
- # 配置深度流(必须)。
- profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
- if profile_list is None:
- raise RuntimeError("depth profile list is empty")
- depth_profile = profile_list.get_default_video_stream_profile()
- if depth_profile is None:
- raise RuntimeError("default depth profile is empty")
- depth_intrinsics = depth_profile.get_intrinsic()
- config.enable_stream(depth_profile)
- # 尝试配置彩色流(可选)。部分设备不支持彩色,失败时保留深度能力即可。
- try:
- color_profile_list = pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR)
- if color_profile_list is not None:
- color_profile = color_profile_list.get_default_video_stream_profile()
- if color_profile is not None:
- config.enable_stream(color_profile)
- except OBError:
- pass
- pipeline.start(config)
- return pipeline, depth_intrinsics, depth_profile
- def extract_depth_data(depth_frame, settings, temporal_filter):
- """从深度帧中提取并清洗深度矩阵(单位:毫米)。"""
- if depth_frame is None:
- return None
- if depth_frame.get_format() != OBFormat.Y16:
- # 当前逻辑仅处理 Y16 格式深度帧。
- return None
- width = depth_frame.get_width()
- height = depth_frame.get_height()
- scale = depth_frame.get_depth_scale()
- # 1) 将原始缓冲区转为 2D 深度矩阵;2) 应用深度比例尺;3) 过滤无效深度范围。
- depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
- depth_data = depth_data.reshape((height, width))
- depth_data = depth_data.astype(np.float32) * scale
- depth_data = np.where(
- (depth_data > settings.min_depth) & (depth_data < settings.max_depth),
- depth_data,
- 0,
- ).astype(np.uint16)
- # 中值滤波:抑制椒盐噪声;核必须是奇数。
- if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
- depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
- # 开运算:移除孤立噪点,保留连通区域。
- if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
- kernel = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE,
- (settings.morph_open_ksize, settings.morph_open_ksize),
- )
- valid_mask = (depth_data > 0).astype(np.uint8)
- valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
- depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
- # 时间滤波:进一步减小帧间波动。
- if temporal_filter is not None:
- depth_data = temporal_filter.process(depth_data)
- return depth_data
- def compute_roi_bounds(depth_data, depth_intrinsics, settings):
- """计算中心 ROI 的像素边界。
- 逻辑说明:
- 1. 读取图像中心点深度作为当前距离参考;
- 2. 将目标物理尺寸(厘米)换算为该距离下的像素尺寸;
- 3. 将 ROI 裁剪到图像有效范围内。
- """
- height, width = depth_data.shape
- center_y = height // 2
- center_x = width // 2
- center_distance = int(depth_data[center_y, center_x])
- if center_distance <= 0:
- return None
- center_distance_m = center_distance / 1000.0
- if center_distance_m <= 0:
- return None
- half_width_m = (settings.roi_width_cm / 100) / 2
- half_height_m = (settings.roi_height_cm / 100) / 2
- # 使用相机内参把真实长度投影到像素平面。
- half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
- half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
- if half_width_px <= 0 or half_height_px <= 0:
- return None
- # 防止 ROI 超出图像边界。
- half_width_px = min(half_width_px, center_x, width - center_x - 1)
- half_height_px = min(half_height_px, center_y, height - center_y - 1)
- if half_width_px <= 0 or half_height_px <= 0:
- return None
- x_start = center_x - half_width_px
- x_end = center_x + half_width_px + 1
- y_start = center_y - half_height_px
- y_end = center_y + half_height_px + 1
- return x_start, x_end, y_start, y_end, center_distance
- def nearest_distance_in_roi(roi, settings):
- """计算 ROI 内最近距离。
- 默认返回有效像素最小值;若设置了分位数则返回对应分位值。
- """
- valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
- if valid_values.size == 0:
- return None
- if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
- return int(np.percentile(valid_values, settings.nearest_percentile))
- return int(valid_values.min())
- def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
- """在 ROI 中定位最近点,并返回其在整幅图中的坐标。"""
- if nearest_distance is None or nearest_distance <= 0:
- return None
- # 先把无效深度置为最大值,确保 argmin 不会选到无效像素。
- roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
- roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
- # 使用分位数策略时,仅保留不大于 nearest_distance 的候选点。
- if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
- roi_candidate = np.where(
- roi_candidate <= nearest_distance,
- roi_candidate,
- np.iinfo(np.uint16).max,
- )
- min_idx = np.argmin(roi_candidate)
- min_val = roi_candidate.flat[min_idx]
- if min_val == np.iinfo(np.uint16).max:
- return None
- min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
- return x_start + min_x, y_start + min_y
|