| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import os
- from dataclasses import dataclass
- import cv2
- import numpy as np
- from pyorbbecsdk import Config, Pipeline, OBSensorType, OBFormat
- def _get_env_int(name, default):
- # 从环境变量读取整数,失败时返回默认值
- value = os.getenv(name)
- if value is None or value.strip() == "":
- return default
- try:
- return int(value)
- except ValueError:
- return default
- @dataclass(frozen=True)
- class Settings:
- # 深度数据处理参数
- min_depth: int
- max_depth: int
- roi_width_cm: int
- roi_height_cm: int
- median_blur_ksize: int
- morph_open_ksize: int
- nearest_percentile: int
- @classmethod
- def from_env(
- cls,
- *,
- median_blur_ksize=5,
- morph_open_ksize=3,
- nearest_percentile=5,
- ):
- # 从环境变量构建配置
- return cls(
- min_depth=_get_env_int("MIN_DEPTH", 500),
- max_depth=_get_env_int("MAX_DEPTH", 4000),
- roi_width_cm=_get_env_int("ROI_WIDTH_CM", 10),
- roi_height_cm=_get_env_int("ROI_HEIGHT_CM", 12),
- median_blur_ksize=median_blur_ksize,
- morph_open_ksize=morph_open_ksize,
- nearest_percentile=nearest_percentile,
- )
- class TemporalFilter:
- def __init__(self, alpha):
- # alpha 越大越偏向当前帧
- self.alpha = alpha
- self.previous_frame = None
- def process(self, frame):
- # 对连续帧做指数加权平滑
- if self.previous_frame is None:
- result = frame
- else:
- result = cv2.addWeighted(frame, self.alpha, self.previous_frame, 1 - self.alpha, 0)
- self.previous_frame = result
- return result
- def init_depth_pipeline():
- # 初始化相机并获取深度内参
- config = Config()
- pipeline = Pipeline()
- profile_list = pipeline.get_stream_profile_list(OBSensorType.DEPTH_SENSOR)
- if profile_list is None:
- raise RuntimeError("depth profile list is empty")
- depth_profile = profile_list.get_default_video_stream_profile()
- if depth_profile is None:
- raise RuntimeError("default depth profile is empty")
- depth_intrinsics = depth_profile.get_intrinsic()
- config.enable_stream(depth_profile)
- pipeline.start(config)
- return pipeline, depth_intrinsics, depth_profile
- def extract_depth_data(depth_frame, settings, temporal_filter):
- # 从原始帧中提取并滤波深度数据
- if depth_frame is None:
- return None
- if depth_frame.get_format() != OBFormat.Y16:
- return None
- width = depth_frame.get_width()
- height = depth_frame.get_height()
- scale = depth_frame.get_depth_scale()
- # 读取深度数据并转换单位(毫米)
- depth_data = np.frombuffer(depth_frame.get_data(), dtype=np.uint16)
- depth_data = depth_data.reshape((height, width))
- depth_data = depth_data.astype(np.float32) * scale
- depth_data = np.where(
- (depth_data > settings.min_depth) & (depth_data < settings.max_depth),
- depth_data,
- 0,
- ).astype(np.uint16)
- # 中值滤波与开运算,去除噪点
- if settings.median_blur_ksize and settings.median_blur_ksize % 2 == 1:
- depth_data = cv2.medianBlur(depth_data, settings.median_blur_ksize)
- if settings.morph_open_ksize and settings.morph_open_ksize % 2 == 1:
- kernel = cv2.getStructuringElement(
- cv2.MORPH_ELLIPSE,
- (settings.morph_open_ksize, settings.morph_open_ksize),
- )
- valid_mask = (depth_data > 0).astype(np.uint8)
- valid_mask = cv2.morphologyEx(valid_mask, cv2.MORPH_OPEN, kernel)
- depth_data = np.where(valid_mask > 0, depth_data, 0).astype(np.uint16)
- # 可选的时间滤波
- if temporal_filter is not None:
- depth_data = temporal_filter.process(depth_data)
- return depth_data
- def compute_roi_bounds(depth_data, depth_intrinsics, settings):
- # 根据中心点距离动态计算 ROI 的像素范围
- height, width = depth_data.shape
- center_y = height // 2
- center_x = width // 2
- center_distance = int(depth_data[center_y, center_x])
- if center_distance <= 0:
- return None
- center_distance_m = center_distance / 1000.0
- if center_distance_m <= 0:
- return None
- half_width_m = (settings.roi_width_cm / 100) / 2
- half_height_m = (settings.roi_height_cm / 100) / 2
- half_width_px = int(depth_intrinsics.fx * half_width_m / center_distance_m)
- half_height_px = int(depth_intrinsics.fy * half_height_m / center_distance_m)
- if half_width_px <= 0 or half_height_px <= 0:
- return None
- half_width_px = min(half_width_px, center_x, width - center_x - 1)
- half_height_px = min(half_height_px, center_y, height - center_y - 1)
- if half_width_px <= 0 or half_height_px <= 0:
- return None
- x_start = center_x - half_width_px
- x_end = center_x + half_width_px + 1
- y_start = center_y - half_height_px
- y_end = center_y + half_height_px + 1
- return x_start, x_end, y_start, y_end, center_distance
- def nearest_distance_in_roi(roi, settings):
- # 计算 ROI 内的最近距离(或按百分位)
- valid_values = roi[(roi >= settings.min_depth) & (roi <= settings.max_depth)]
- if valid_values.size == 0:
- return None
- if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
- return int(np.percentile(valid_values, settings.nearest_percentile))
- return int(valid_values.min())
- def find_nearest_point(roi, x_start, y_start, settings, nearest_distance):
- # 返回最近点在整图中的坐标
- if nearest_distance is None or nearest_distance <= 0:
- return None
- roi_mask = (roi >= settings.min_depth) & (roi <= settings.max_depth)
- roi_candidate = np.where(roi_mask, roi, np.iinfo(np.uint16).max)
- if settings.nearest_percentile and 0 < settings.nearest_percentile < 100:
- roi_candidate = np.where(
- roi_candidate <= nearest_distance,
- roi_candidate,
- np.iinfo(np.uint16).max,
- )
- min_idx = np.argmin(roi_candidate)
- min_val = roi_candidate.flat[min_idx]
- if min_val == np.iinfo(np.uint16).max:
- return None
- min_y, min_x = np.unravel_index(min_idx, roi_candidate.shape)
- return x_start + min_x, y_start + min_y
|