|
|
@@ -1,14 +1,23 @@
|
|
|
+"""深度测量通用能力。
|
|
|
+
|
|
|
+该模块封装了深度相机初始化、深度帧预处理、ROI 计算、最近点提取等核心逻辑,
|
|
|
+供 API 服务与本地调试脚本共用。
|
|
|
+"""
|
|
|
+
|
|
|
import os
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
-
|
|
|
-from pyorbbecsdk import Config, Pipeline, OBSensorType, OBFormat, OBError
|
|
|
+from pyorbbecsdk import Config, OBError, OBFormat, OBSensorType, Pipeline
|
|
|
|
|
|
|
|
|
def _get_env_int(name, default):
|
|
|
- # 从环境变量读取整数,失败时返回默认值
|
|
|
+ """读取整型环境变量。
|
|
|
+
|
|
|
+ - 当变量不存在或为空字符串时返回默认值;
|
|
|
+ - 当变量无法转换为整数时返回默认值。
|
|
|
+ """
|
|
|
value = os.getenv(name)
|
|
|
if value is None or value.strip() == "":
|
|
|
return default
|
|
|
@@ -20,13 +29,21 @@ def _get_env_int(name, default):
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
class Settings:
|
|
|
- # 深度数据处理参数
|
|
|
+ """深度处理参数集合。"""
|
|
|
+
|
|
|
+ # 有效深度区间(毫米);超出范围的像素会被置为 0。
|
|
|
min_depth: int
|
|
|
max_depth: int
|
|
|
+
|
|
|
+ # 以图像中心为基准,计算物理尺寸为 roi_width_cm x roi_height_cm 的 ROI。
|
|
|
roi_width_cm: int
|
|
|
roi_height_cm: int
|
|
|
+
|
|
|
+ # 预处理参数:中值滤波核大小、形态学开运算核大小(建议为奇数)。
|
|
|
median_blur_ksize: int
|
|
|
morph_open_ksize: int
|
|
|
+
|
|
|
+ # 最近距离统计分位数;例如 5 表示取前 5% 的分位值,降低偶发噪声影响。
|
|
|
nearest_percentile: int
|
|
|
|
|
|
@classmethod
|
|
|
@@ -37,7 +54,7 @@ class Settings:
|
|
|
morph_open_ksize=3,
|
|
|
nearest_percentile=5,
|
|
|
):
|
|
|
- # 从环境变量构建配置
|
|
|
+ """从环境变量创建参数对象。"""
|
|
|
return cls(
|
|
|
min_depth=_get_env_int("MIN_DEPTH", 500),
|
|
|
max_depth=_get_env_int("MAX_DEPTH", 4000),
|
|
|
@@ -50,14 +67,21 @@ class Settings:
|
|
|
|
|
|
|
|
|
class TemporalFilter:
|
|
|
+ """简易时间域平滑滤波器。
|
|
|
+
|
|
|
+ 使用指数加权平均(EWMA)降低帧间抖动:
|
|
|
+ current = alpha * new + (1 - alpha) * previous
|
|
|
+ """
|
|
|
+
|
|
|
def __init__(self, alpha):
|
|
|
- # alpha 越大越偏向当前帧
|
|
|
+ # alpha 越大,结果越偏向当前帧;越小,结果越平滑。
|
|
|
self.alpha = alpha
|
|
|
self.previous_frame = None
|
|
|
|
|
|
def process(self, frame):
|
|
|
- # 对连续帧做指数加权平滑
|
|
|
+ """对单帧执行时间滤波并返回结果。"""
|
|
|
if self.previous_frame is None:
|
|
|
+ # 第一帧没有历史值,直接返回原始帧。
|
|
|
result = frame
|
|
|
else:
|
|
|
result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
|
|
|
@@ -66,9 +90,14 @@ class TemporalFilter:
|
|
|
|
|
|
|
|
|
def init_depth_pipeline():
|
|
|
- # 初始化相机并获取深度内参
|
|
|
+ """初始化深度相机管线并返回关键对象。
|
|
|
+
|
|
|
+ 返回:`(pipeline, depth_intrinsics, depth_profile)`。
|
|
|
+ """
|
|
|
config = Config()
|
|
|
pipeline = Pipeline()
|
|
|
+
|
|
|
+ # 配置深度流(必须)。
|
|
|
profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
|
|
|
if profile_list is None:
|
|
|
raise RuntimeError("depth profile list is empty")
|
|
|
@@ -78,7 +107,7 @@ def init_depth_pipeline():
|
|
|
depth_intrinsics = depth_profile.get_intrinsic()
|
|
|
config.enable_stream(depth_profile)
|
|
|
|
|
|
- # Optionally enable color stream so callers can access color frames when available.
|
|
|
+ # 尝试配置彩色流(可选)。部分设备不支持彩色,失败时保留深度能力即可。
|
|
|
try:
|
|
|
color_profile_list = pipeline.get_stream_profile_list(OBSensorType.COLOR_SENSOR)
|
|
|
if color_profile_list is not None:
|
|
|
@@ -86,7 +115,6 @@ def init_depth_pipeline():
|
|
|
if color_profile is not None:
|
|
|
config.enable_stream(color_profile)
|
|
|
except OBError:
|
|
|
- # Some devices do not provide color sensors; keep depth-only behavior.
|
|
|
pass
|
|
|
|
|
|
pipeline.start(config)
|
|
|
@@ -94,16 +122,18 @@ def init_depth_pipeline():
|
|
|
|
|
|
|
|
|
def extract_depth_data(depth_frame, settings, temporal_filter):
|
|
|
- # 从原始帧中提取并滤波深度数据
|
|
|
+ """从深度帧中提取并清洗深度矩阵(单位:毫米)。"""
|
|
|
if depth_frame is None:
|
|
|
return None
|
|
|
if depth_frame.get_format() != OBFormat.Y16:
|
|
|
+ # 当前逻辑仅处理 Y16 格式深度帧。
|
|
|
return None
|
|
|
+
|
|
|
width = depth_frame.get_width()
|
|
|
height = depth_frame.get_height()
|
|
|
scale = depth_frame.get_depth_scale()
|
|
|
|
|
|
- # 读取深度数据并转换单位(毫米)
|
|
|
+ # 1) 将原始缓冲区转为 2D 深度矩阵;2) 应用深度比例尺;3) 过滤无效深度范围。
|
|
|
depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
|
|
|
depth_data = depth_data.reshape((height, width))
|
|
|
depth_data = depth_data.astype(np.float32) * scale
|
|
|
@@ -113,9 +143,11 @@ def extract_depth_data(depth_frame, settings, temporal_filter):
|
|
|
0,
|
|
|
).astype(np.uint16)
|
|
|
|
|
|
- # 中值滤波与开运算,去除噪点
|
|
|
+ # 中值滤波:抑制椒盐噪声;核必须是奇数。
|
|
|
if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
|
|
|
depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
|
|
|
+
|
|
|
+ # 开运算:移除孤立噪点,保留连通区域。
|
|
|
if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
|
|
|
kernel = cv2.getStructuringElement(
|
|
|
cv2.MORPH_ELLIPSE,
|
|
|
@@ -125,7 +157,7 @@ def extract_depth_data(depth_frame, settings, temporal_filter):
|
|
|
valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
|
|
|
depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
|
|
|
|
|
|
- # 可选的时间滤波
|
|
|
+ # 时间滤波:进一步减小帧间波动。
|
|
|
if temporal_filter is not None:
|
|
|
depth_data = temporal_filter.process(depth_data)
|
|
|
|
|
|
@@ -133,26 +165,40 @@ def extract_depth_data(depth_frame, settings, temporal_filter):
|
|
|
|
|
|
|
|
|
def compute_roi_bounds(depth_data, depth_intrinsics, settings):
|
|
|
- # 根据中心点距离动态计算 ROI 的像素范围
|
|
|
+ """计算中心 ROI 的像素边界。
|
|
|
+
|
|
|
+ 逻辑说明:
|
|
|
+ 1. 读取图像中心点深度作为当前距离参考;
|
|
|
+ 2. 将目标物理尺寸(厘米)换算为该距离下的像素尺寸;
|
|
|
+ 3. 将 ROI 裁剪到图像有效范围内。
|
|
|
+ """
|
|
|
height, width = depth_data.shape
|
|
|
center_y = height // 2
|
|
|
center_x = width // 2
|
|
|
+
|
|
|
center_distance = int(depth_data[center_y, center_x])
|
|
|
if center_distance <= 0:
|
|
|
return None
|
|
|
+
|
|
|
center_distance_m = center_distance / 1000.0
|
|
|
if center_distance_m <= 0:
|
|
|
return None
|
|
|
+
|
|
|
half_width_m = (settings.roi_width_cm / 100) / 2
|
|
|
half_height_m = (settings.roi_height_cm / 100) / 2
|
|
|
+
|
|
|
+ # 使用相机内参把真实长度投影到像素平面。
|
|
|
half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
|
|
|
half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
|
|
|
if half_width_px <= 0 or half_height_px <= 0:
|
|
|
return None
|
|
|
+
|
|
|
+ # 防止 ROI 超出图像边界。
|
|
|
half_width_px = min(half_width_px, center_x, width - center_x - 1)
|
|
|
half_height_px = min(half_height_px, center_y, height - center_y - 1)
|
|
|
if half_width_px <= 0 or half_height_px <= 0:
|
|
|
return None
|
|
|
+
|
|
|
x_start = center_x - half_width_px
|
|
|
x_end = center_x + half_width_px + 1
|
|
|
y_start = center_y - half_height_px
|
|
|
@@ -161,7 +207,10 @@ def compute_roi_bounds(depth_data, depth_intrinsics, settings):
|
|
|
|
|
|
|
|
|
def nearest_distance_in_roi(roi, settings):
|
|
|
- # 计算 ROI 内的最近距离(或按百分位)
|
|
|
+ """计算 ROI 内最近距离。
|
|
|
+
|
|
|
+ 默认返回有效像素最小值;若设置了分位数则返回对应分位值。
|
|
|
+ """
|
|
|
valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
|
|
|
if valid_values.size == 0:
|
|
|
return None
|
|
|
@@ -171,20 +220,26 @@ def nearest_distance_in_roi(roi, settings):
|
|
|
|
|
|
|
|
|
def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
|
|
|
- # 返回最近点在整图中的坐标
|
|
|
+ """在 ROI 中定位最近点,并返回其在整幅图中的坐标。"""
|
|
|
if nearest_distance is None or nearest_distance <= 0:
|
|
|
return None
|
|
|
+
|
|
|
+ # 先把无效深度置为最大值,确保 argmin 不会选到无效像素。
|
|
|
roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
|
|
|
roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
|
|
|
+
|
|
|
+ # 使用分位数策略时,仅保留不大于 nearest_distance 的候选点。
|
|
|
if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
|
|
|
roi_candidate = np.where(
|
|
|
roi_candidate <= nearest_distance,
|
|
|
roi_candidate,
|
|
|
np.iinfo(np.uint16).max,
|
|
|
)
|
|
|
+
|
|
|
min_idx = np.argmin(roi_candidate)
|
|
|
min_val = roi_candidate.flat[min_idx]
|
|
|
if min_val == np.iinfo(np.uint16).max:
|
|
|
return None
|
|
|
+
|
|
|
min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
|
|
|
return x_start + min_x, y_start + min_y
|